hydro_lang/
stream.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21    CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub struct TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub struct NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41    /// The weaker of the two orderings.
42    type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47    type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52    type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57    type Min = NoOrder;
58}
59
60/// An ordered sequence stream of elements of type `T`.
61///
62/// Type Parameters:
63/// - `Type`: the type of elements in the stream
64/// - `Loc`: the location where the stream is being materialized
65/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
66///   or [`Unbounded`]
67/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
68///   or [`NoOrder`] (default is [`TotalOrder`])
69pub struct Stream<Type, Loc, Bound, Order = TotalOrder> {
70    location: Loc,
71    pub(crate) ir_node: RefCell<HydroNode>,
72
73    _phantom: PhantomData<(Type, Loc, Bound, Order)>,
74}
75
76impl<'a, T, L, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O>
77where
78    L: Location<'a>,
79{
80    fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
81        Stream {
82            location: stream.location,
83            ir_node: stream.ir_node,
84            _phantom: PhantomData,
85        }
86    }
87}
88
89impl<'a, T, L, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder>
90where
91    L: Location<'a>,
92{
93    fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
94        Stream {
95            location: stream.location,
96            ir_node: stream.ir_node,
97            _phantom: PhantomData,
98        }
99    }
100}
101
102impl<'a, T, L, B, O> Stream<T, L, B, O>
103where
104    L: Location<'a>,
105{
106    fn location_kind(&self) -> LocationId {
107        self.location.id()
108    }
109}
110
111impl<'a, T, L, O> DeferTick for Stream<T, Tick<L>, Bounded, O>
112where
113    L: Location<'a>,
114{
115    fn defer_tick(self) -> Self {
116        Stream::defer_tick(self)
117    }
118}
119
120impl<'a, T, L, O> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O>
121where
122    L: Location<'a>,
123{
124    type Location = Tick<L>;
125
126    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
127        let location_id = location.id();
128        Stream::new(
129            location.clone(),
130            HydroNode::CycleSource {
131                ident,
132                location_kind: location_id,
133                metadata: location.new_node_metadata::<T>(),
134            },
135        )
136    }
137}
138
139impl<'a, T, L, O> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O>
140where
141    L: Location<'a>,
142{
143    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
144        assert_eq!(
145            self.location.id(),
146            expected_location,
147            "locations do not match"
148        );
149        self.location
150            .flow_state()
151            .borrow_mut()
152            .leaves
153            .as_mut()
154            .expect(FLOW_USED_MESSAGE)
155            .push(HydroLeaf::CycleSink {
156                ident,
157                location_kind: self.location_kind(),
158                input: Box::new(self.ir_node.into_inner()),
159                metadata: self.location.new_node_metadata::<T>(),
160            });
161    }
162}
163
164impl<'a, T, L, B, O> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O>
165where
166    L: Location<'a> + NoTick,
167{
168    type Location = L;
169
170    fn create_source(ident: syn::Ident, location: L) -> Self {
171        let location_id = location.id();
172
173        Stream::new(
174            location.clone(),
175            HydroNode::Persist {
176                inner: Box::new(HydroNode::CycleSource {
177                    ident,
178                    location_kind: location_id,
179                    metadata: location.new_node_metadata::<T>(),
180                }),
181                metadata: location.new_node_metadata::<T>(),
182            },
183        )
184    }
185}
186
187impl<'a, T, L, B, O> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O>
188where
189    L: Location<'a> + NoTick,
190{
191    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
192        assert_eq!(
193            self.location.id(),
194            expected_location,
195            "locations do not match"
196        );
197        let metadata = self.location.new_node_metadata::<T>();
198        self.location
199            .flow_state()
200            .borrow_mut()
201            .leaves
202            .as_mut()
203            .expect(FLOW_USED_MESSAGE)
204            .push(HydroLeaf::CycleSink {
205                ident,
206                location_kind: self.location_kind(),
207                input: Box::new(HydroNode::Unpersist {
208                    inner: Box::new(self.ir_node.into_inner()),
209                    metadata: metadata.clone(),
210                }),
211                metadata,
212            });
213    }
214}
215
216impl<'a, T, L, B, O> Stream<T, L, B, O>
217where
218    L: Location<'a>,
219{
220    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
221        Stream {
222            location,
223            ir_node: RefCell::new(ir_node),
224            _phantom: PhantomData,
225        }
226    }
227}
228
229impl<'a, T, L, B, O> Clone for Stream<T, L, B, O>
230where
231    T: Clone,
232    L: Location<'a>,
233{
234    fn clone(&self) -> Self {
235        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
236            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
237            *self.ir_node.borrow_mut() = HydroNode::Tee {
238                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
239                metadata: self.location.new_node_metadata::<T>(),
240            };
241        }
242
243        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
244            Stream {
245                location: self.location.clone(),
246                ir_node: HydroNode::Tee {
247                    inner: TeeNode(inner.0.clone()),
248                    metadata: metadata.clone(),
249                }
250                .into(),
251                _phantom: PhantomData,
252            }
253        } else {
254            unreachable!()
255        }
256    }
257}
258
259impl<'a, T, L, B, O> Stream<T, L, B, O>
260where
261    L: Location<'a>,
262{
263    /// Produces a stream based on invoking `f` on each element in order.
264    /// If you do not want to modify the stream and instead only want to view
265    /// each item use [`Stream::inspect`] instead.
266    ///
267    /// # Example
268    /// ```rust
269    /// # use hydro_lang::*;
270    /// # use futures::StreamExt;
271    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
272    /// let words = process.source_iter(q!(vec!["hello", "world"]));
273    /// words.map(q!(|x| x.to_uppercase()))
274    /// # }, |mut stream| async move {
275    /// # for w in vec!["HELLO", "WORLD"] {
276    /// #     assert_eq!(stream.next().await.unwrap(), w);
277    /// # }
278    /// # }));
279    /// ```
280    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
281    where
282        F: Fn(T) -> U + 'a,
283    {
284        let f = f.splice_fn1_ctx(&self.location).into();
285        Stream::new(
286            self.location.clone(),
287            HydroNode::Map {
288                f,
289                input: Box::new(self.ir_node.into_inner()),
290                metadata: self.location.new_node_metadata::<U>(),
291            },
292        )
293    }
294
295    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
296    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
297    /// for the output type `U` must produce items in a **deterministic** order.
298    ///
299    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
300    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
301    ///
302    /// # Example
303    /// ```rust
304    /// # use hydro_lang::*;
305    /// # use futures::StreamExt;
306    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
307    /// process
308    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
309    ///     .flat_map_ordered(q!(|x| x))
310    /// # }, |mut stream| async move {
311    /// // 1, 2, 3, 4
312    /// # for w in (1..5) {
313    /// #     assert_eq!(stream.next().await.unwrap(), w);
314    /// # }
315    /// # }));
316    /// ```
317    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
318    where
319        I: IntoIterator<Item = U>,
320        F: Fn(T) -> I + 'a,
321    {
322        let f = f.splice_fn1_ctx(&self.location).into();
323        Stream::new(
324            self.location.clone(),
325            HydroNode::FlatMap {
326                f,
327                input: Box::new(self.ir_node.into_inner()),
328                metadata: self.location.new_node_metadata::<U>(),
329            },
330        )
331    }
332
333    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
334    /// for the output type `U` to produce items in any order.
335    ///
336    /// # Example
337    /// ```rust
338    /// # use hydro_lang::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
341    /// process
342    ///     .source_iter(q!(vec![
343    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
344    ///         std::collections::HashSet::from_iter(vec![3, 4]),
345    ///     ]))
346    ///     .flat_map_unordered(q!(|x| x))
347    /// # }, |mut stream| async move {
348    /// // 1, 2, 3, 4, but in no particular order
349    /// # let mut results = Vec::new();
350    /// # for w in (1..5) {
351    /// #     results.push(stream.next().await.unwrap());
352    /// # }
353    /// # results.sort();
354    /// # assert_eq!(results, vec![1, 2, 3, 4]);
355    /// # }));
356    /// ```
357    pub fn flat_map_unordered<U, I, F>(
358        self,
359        f: impl IntoQuotedMut<'a, F, L>,
360    ) -> Stream<U, L, B, NoOrder>
361    where
362        I: IntoIterator<Item = U>,
363        F: Fn(T) -> I + 'a,
364    {
365        let f = f.splice_fn1_ctx(&self.location).into();
366        Stream::new(
367            self.location.clone(),
368            HydroNode::FlatMap {
369                f,
370                input: Box::new(self.ir_node.into_inner()),
371                metadata: self.location.new_node_metadata::<U>(),
372            },
373        )
374    }
375
376    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
377    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
378    ///
379    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
380    /// not deterministic, use [`Stream::flatten_unordered`] instead.
381    ///
382    /// ```rust
383    /// # use hydro_lang::*;
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
386    /// process
387    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
388    ///     .flatten_ordered()
389    /// # }, |mut stream| async move {
390    /// // 1, 2, 3, 4
391    /// # for w in (1..5) {
392    /// #     assert_eq!(stream.next().await.unwrap(), w);
393    /// # }
394    /// # }));
395    /// ```
396    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O>
397    where
398        T: IntoIterator<Item = U>,
399    {
400        self.flat_map_ordered(q!(|d| d))
401    }
402
403    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
404    /// for the element type `T` to produce items in any order.
405    ///
406    /// # Example
407    /// ```rust
408    /// # use hydro_lang::*;
409    /// # use futures::StreamExt;
410    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
411    /// process
412    ///     .source_iter(q!(vec![
413    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
414    ///         std::collections::HashSet::from_iter(vec![3, 4]),
415    ///     ]))
416    ///     .flatten_unordered()
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, 4, but in no particular order
419    /// # let mut results = Vec::new();
420    /// # for w in (1..5) {
421    /// #     results.push(stream.next().await.unwrap());
422    /// # }
423    /// # results.sort();
424    /// # assert_eq!(results, vec![1, 2, 3, 4]);
425    /// # }));
426    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
427    where
428        T: IntoIterator<Item = U>,
429    {
430        self.flat_map_unordered(q!(|d| d))
431    }
432
433    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
434    /// `f`, preserving the order of the elements.
435    ///
436    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
437    /// not modify or take ownership of the values. If you need to modify the values while filtering
438    /// use [`Stream::filter_map`] instead.
439    ///
440    /// # Example
441    /// ```rust
442    /// # use hydro_lang::*;
443    /// # use futures::StreamExt;
444    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
445    /// process
446    ///     .source_iter(q!(vec![1, 2, 3, 4]))
447    ///     .filter(q!(|&x| x > 2))
448    /// # }, |mut stream| async move {
449    /// // 3, 4
450    /// # for w in (3..5) {
451    /// #     assert_eq!(stream.next().await.unwrap(), w);
452    /// # }
453    /// # }));
454    /// ```
455    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O>
456    where
457        F: Fn(&T) -> bool + 'a,
458    {
459        let f = f.splice_fn1_borrow_ctx(&self.location).into();
460        Stream::new(
461            self.location.clone(),
462            HydroNode::Filter {
463                f,
464                input: Box::new(self.ir_node.into_inner()),
465                metadata: self.location.new_node_metadata::<T>(),
466            },
467        )
468    }
469
470    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
471    ///
472    /// # Example
473    /// ```rust
474    /// # use hydro_lang::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
477    /// process
478    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
479    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
480    /// # }, |mut stream| async move {
481    /// // 1, 2
482    /// # for w in (1..3) {
483    /// #     assert_eq!(stream.next().await.unwrap(), w);
484    /// # }
485    /// # }));
486    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
487    where
488        F: Fn(T) -> Option<U> + 'a,
489    {
490        let f = f.splice_fn1_ctx(&self.location).into();
491        Stream::new(
492            self.location.clone(),
493            HydroNode::FilterMap {
494                f,
495                input: Box::new(self.ir_node.into_inner()),
496                metadata: self.location.new_node_metadata::<U>(),
497            },
498        )
499    }
500
501    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
502    /// where `x` is the final value of `other`, a bounded [`Singleton`].
503    ///
504    /// # Example
505    /// ```rust
506    /// # use hydro_lang::*;
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
509    /// let tick = process.tick();
510    /// let batch = unsafe {
511    ///     process
512    ///         .source_iter(q!(vec![1, 2, 3, 4]))
513    ///         .tick_batch(&tick)
514    /// };
515    /// let count = batch.clone().count(); // `count()` returns a singleton
516    /// batch.cross_singleton(count).all_ticks()
517    /// # }, |mut stream| async move {
518    /// // (1, 4), (2, 4), (3, 4), (4, 4)
519    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
520    /// #     assert_eq!(stream.next().await.unwrap(), w);
521    /// # }
522    /// # }));
523    pub fn cross_singleton<O2>(
524        self,
525        other: impl Into<Optional<O2, L, Bounded>>,
526    ) -> Stream<(T, O2), L, B, O>
527    where
528        O2: Clone,
529    {
530        let other: Optional<O2, L, Bounded> = other.into();
531        check_matching_location(&self.location, &other.location);
532
533        Stream::new(
534            self.location.clone(),
535            HydroNode::CrossSingleton {
536                left: Box::new(self.ir_node.into_inner()),
537                right: Box::new(other.ir_node.into_inner()),
538                metadata: self.location.new_node_metadata::<(T, O2)>(),
539            },
540        )
541    }
542
543    /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
544    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O> {
545        self.cross_singleton(signal.map(q!(|_u| ())))
546            .map(q!(|(d, _signal)| d))
547    }
548
549    /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
550    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O> {
551        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
552    }
553
554    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
555    /// tupled pairs in a non-deterministic order.
556    ///
557    /// # Example
558    /// ```rust
559    /// # use hydro_lang::*;
560    /// # use std::collections::HashSet;
561    /// # use futures::StreamExt;
562    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
563    /// let tick = process.tick();
564    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
565    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
566    /// stream1.cross_product(stream2)
567    /// # }, |mut stream| async move {
568    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
569    /// # stream.map(|i| assert!(expected.contains(&i)));
570    /// # }));
571    pub fn cross_product<O2>(self, other: Stream<O2, L, B, O>) -> Stream<(T, O2), L, B, NoOrder>
572    where
573        T: Clone,
574        O2: Clone,
575    {
576        check_matching_location(&self.location, &other.location);
577
578        Stream::new(
579            self.location.clone(),
580            HydroNode::CrossProduct {
581                left: Box::new(self.ir_node.into_inner()),
582                right: Box::new(other.ir_node.into_inner()),
583                metadata: self.location.new_node_metadata::<(T, O2)>(),
584            },
585        )
586    }
587
588    /// Takes one stream as input and filters out any duplicate occurrences. The output
589    /// contains all unique values from the input.
590    ///
591    /// # Example
592    /// ```rust
593    /// # use hydro_lang::*;
594    /// # use futures::StreamExt;
595    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
596    /// let tick = process.tick();
597    ///     process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
598    /// # }, |mut stream| async move {
599    /// # for w in vec![1, 2, 3, 4] {
600    /// #     assert_eq!(stream.next().await.unwrap(), w);
601    /// # }
602    /// # }));
603    pub fn unique(self) -> Stream<T, L, B, O>
604    where
605        T: Eq + Hash,
606    {
607        Stream::new(
608            self.location.clone(),
609            HydroNode::Unique {
610                input: Box::new(self.ir_node.into_inner()),
611                metadata: self.location.new_node_metadata::<T>(),
612            },
613        )
614    }
615
616    /// Outputs everything in this stream that is *not* contained in the `other` stream.
617    ///
618    /// The `other` stream must be [`Bounded`], since this function will wait until
619    /// all its elements are available before producing any output.
620    /// # Example
621    /// ```rust
622    /// # use hydro_lang::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
625    /// let tick = process.tick();
626    /// let stream = unsafe {
627    ///    process
628    ///    .source_iter(q!(vec![ 1, 2, 3, 4 ]))
629    ///    .tick_batch(&tick)
630    /// };
631    /// let batch = unsafe {
632    ///     process
633    ///         .source_iter(q!(vec![1, 2]))
634    ///         .tick_batch(&tick)
635    /// };
636    /// stream.filter_not_in(batch).all_ticks()
637    /// # }, |mut stream| async move {
638    /// # for w in vec![3, 4] {
639    /// #     assert_eq!(stream.next().await.unwrap(), w);
640    /// # }
641    /// # }));
642    pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, O>
643    where
644        T: Eq + Hash,
645    {
646        check_matching_location(&self.location, &other.location);
647
648        Stream::new(
649            self.location.clone(),
650            HydroNode::Difference {
651                pos: Box::new(self.ir_node.into_inner()),
652                neg: Box::new(other.ir_node.into_inner()),
653                metadata: self.location.new_node_metadata::<T>(),
654            },
655        )
656    }
657
658    /// An operator which allows you to "inspect" each element of a stream without
659    /// modifying it. The closure `f` is called on a reference to each item. This is
660    /// mainly useful for debugging, and should not be used to generate side-effects.
661    ///
662    /// # Example
663    /// ```rust
664    /// # use hydro_lang::*;
665    /// # use futures::StreamExt;
666    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
667    /// let nums = process.source_iter(q!(vec![1, 2]));
668    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
669    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
670    /// # }, |mut stream| async move {
671    /// # for w in vec![1, 2] {
672    /// #     assert_eq!(stream.next().await.unwrap(), w);
673    /// # }
674    /// # }));
675    /// ```
676    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O>
677    where
678        F: Fn(&T) + 'a,
679    {
680        let f = f.splice_fn1_borrow_ctx(&self.location).into();
681
682        if L::is_top_level() {
683            Stream::new(
684                self.location.clone(),
685                HydroNode::Persist {
686                    inner: Box::new(HydroNode::Inspect {
687                        f,
688                        input: Box::new(HydroNode::Unpersist {
689                            inner: Box::new(self.ir_node.into_inner()),
690                            metadata: self.location.new_node_metadata::<T>(),
691                        }),
692                        metadata: self.location.new_node_metadata::<T>(),
693                    }),
694                    metadata: self.location.new_node_metadata::<T>(),
695                },
696            )
697        } else {
698            Stream::new(
699                self.location.clone(),
700                HydroNode::Inspect {
701                    f,
702                    input: Box::new(self.ir_node.into_inner()),
703                    metadata: self.location.new_node_metadata::<T>(),
704                },
705            )
706        }
707    }
708
709    /// Explicitly "casts" the stream to a type with a different ordering
710    /// guarantee. Useful in unsafe code where the ordering cannot be proven
711    /// by the type-system.
712    ///
713    /// # Safety
714    /// This function is used as an escape hatch, and any mistakes in the
715    /// provided ordering guarantee will propagate into the guarantees
716    /// for the rest of the program.
717    ///
718    /// # Example
719    /// # TODO: more sensible code after Shadaj merges
720    /// ```rust
721    /// # use hydro_lang::*;
722    /// # use std::collections::HashSet;
723    /// # use futures::StreamExt;
724    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
725    /// let nums = process.source_iter(q!({
726    ///     let now = std::time::SystemTime::now();
727    ///     match now.elapsed().unwrap().as_secs() % 2 {
728    ///         0 => vec![5, 4, 3, 2, 1],
729    ///         _ => vec![1, 2, 3, 4, 5],
730    ///     }
731    ///     .into_iter()
732    /// }));
733    /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
734    /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
735    /// stream
736    /// # }, |mut stream| async move {
737    /// # for w in vec![1, 2, 3, 4, 5] {
738    /// #     assert!((1..=5).contains(&stream.next().await.unwrap()));
739    /// # }
740    /// # }));
741    /// ```
742    pub unsafe fn assume_ordering<O2>(self) -> Stream<T, L, B, O2> {
743        Stream::new(self.location, self.ir_node.into_inner())
744    }
745}
746
747impl<'a, T, L, B, O> Stream<&T, L, B, O>
748where
749    L: Location<'a>,
750{
751    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
752    ///
753    /// # Example
754    /// ```rust
755    /// # use hydro_lang::*;
756    /// # use futures::StreamExt;
757    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
758    /// process.source_iter(q!(&[1, 2, 3])).cloned()
759    /// # }, |mut stream| async move {
760    /// // 1, 2, 3
761    /// # for w in vec![1, 2, 3] {
762    /// #     assert_eq!(stream.next().await.unwrap(), w);
763    /// # }
764    /// # }));
765    /// ```
766    pub fn cloned(self) -> Stream<T, L, B, O>
767    where
768        T: Clone,
769    {
770        self.map(q!(|d| d.clone()))
771    }
772}
773
774impl<'a, T, L, B, O> Stream<T, L, B, O>
775where
776    L: Location<'a>,
777    O: MinOrder<NoOrder, Min = NoOrder>,
778{
779    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
780    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
781    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
782    ///
783    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
784    ///
785    /// # Example
786    /// ```rust
787    /// # use hydro_lang::*;
788    /// # use futures::StreamExt;
789    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
790    /// let tick = process.tick();
791    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
792    /// let batch = unsafe { numbers.tick_batch(&tick) };
793    /// batch
794    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
795    ///     .all_ticks()
796    /// # }, |mut stream| async move {
797    /// // 10
798    /// # assert_eq!(stream.next().await.unwrap(), 10);
799    /// # }));
800    /// ```
801    pub fn fold_commutative<A, I, F>(
802        self,
803        init: impl IntoQuotedMut<'a, I, L>,
804        comb: impl IntoQuotedMut<'a, F, L>,
805    ) -> Singleton<A, L, B>
806    where
807        I: Fn() -> A + 'a,
808        F: Fn(&mut A, T),
809    {
810        let init = init.splice_fn0_ctx(&self.location).into();
811        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
812
813        let mut core = HydroNode::Fold {
814            init,
815            acc: comb,
816            input: Box::new(self.ir_node.into_inner()),
817            metadata: self.location.new_node_metadata::<A>(),
818        };
819
820        if L::is_top_level() {
821            // top-level (possibly unbounded) singletons are represented as
822            // a stream which produces all values from all ticks every tick,
823            // so Unpersist will always give the lastest aggregation
824            core = HydroNode::Persist {
825                inner: Box::new(core),
826                metadata: self.location.new_node_metadata::<A>(),
827            };
828        }
829
830        Singleton::new(self.location, core)
831    }
832
833    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
834    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
835    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
836    /// reference, so that it can be modified in place.
837    ///
838    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
839    ///
840    /// # Example
841    /// ```rust
842    /// # use hydro_lang::*;
843    /// # use futures::StreamExt;
844    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
845    /// let tick = process.tick();
846    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
847    /// let batch = unsafe { numbers.tick_batch(&tick) };
848    /// batch
849    ///     .reduce_commutative(q!(|curr, new| *curr += new))
850    ///     .all_ticks()
851    /// # }, |mut stream| async move {
852    /// // 10
853    /// # assert_eq!(stream.next().await.unwrap(), 10);
854    /// # }));
855    /// ```
856    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
857    where
858        F: Fn(&mut T, T) + 'a,
859    {
860        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
861        let mut core = HydroNode::Reduce {
862            f,
863            input: Box::new(self.ir_node.into_inner()),
864            metadata: self.location.new_node_metadata::<T>(),
865        };
866
867        if L::is_top_level() {
868            core = HydroNode::Persist {
869                inner: Box::new(core),
870                metadata: self.location.new_node_metadata::<T>(),
871            };
872        }
873
874        Optional::new(self.location, core)
875    }
876
877    /// Computes the maximum element in the stream as an [`Optional`], which
878    /// will be empty until the first element in the input arrives.
879    ///
880    /// # Example
881    /// ```rust
882    /// # use hydro_lang::*;
883    /// # use futures::StreamExt;
884    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
885    /// let tick = process.tick();
886    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
887    /// let batch = unsafe { numbers.tick_batch(&tick) };
888    /// batch.max().all_ticks()
889    /// # }, |mut stream| async move {
890    /// // 4
891    /// # assert_eq!(stream.next().await.unwrap(), 4);
892    /// # }));
893    /// ```
894    pub fn max(self) -> Optional<T, L, B>
895    where
896        T: Ord,
897    {
898        self.reduce_commutative(q!(|curr, new| {
899            if new > *curr {
900                *curr = new;
901            }
902        }))
903    }
904
905    /// Computes the maximum element in the stream as an [`Optional`], where the
906    /// maximum is determined according to the `key` function. The [`Optional`] will
907    /// be empty until the first element in the input arrives.
908    ///
909    /// # Example
910    /// ```rust
911    /// # use hydro_lang::*;
912    /// # use futures::StreamExt;
913    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
914    /// let tick = process.tick();
915    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
916    /// let batch = unsafe { numbers.tick_batch(&tick) };
917    /// batch.max_by_key(q!(|x| -x)).all_ticks()
918    /// # }, |mut stream| async move {
919    /// // 1
920    /// # assert_eq!(stream.next().await.unwrap(), 1);
921    /// # }));
922    /// ```
923    pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
924    where
925        K: Ord,
926        F: Fn(&T) -> K + 'a,
927    {
928        let f = key.splice_fn1_borrow_ctx(&self.location);
929
930        let wrapped: syn::Expr = parse_quote!({
931            let key_fn = #f;
932            move |curr, new| {
933                if key_fn(&new) > key_fn(&*curr) {
934                    *curr = new;
935                }
936            }
937        });
938
939        let mut core = HydroNode::Reduce {
940            f: wrapped.into(),
941            input: Box::new(self.ir_node.into_inner()),
942            metadata: self.location.new_node_metadata::<T>(),
943        };
944
945        if L::is_top_level() {
946            core = HydroNode::Persist {
947                inner: Box::new(core),
948                metadata: self.location.new_node_metadata::<T>(),
949            };
950        }
951
952        Optional::new(self.location, core)
953    }
954
955    /// Computes the minimum element in the stream as an [`Optional`], which
956    /// will be empty until the first element in the input arrives.
957    ///
958    /// # Example
959    /// ```rust
960    /// # use hydro_lang::*;
961    /// # use futures::StreamExt;
962    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
963    /// let tick = process.tick();
964    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
965    /// let batch = unsafe { numbers.tick_batch(&tick) };
966    /// batch.min().all_ticks()
967    /// # }, |mut stream| async move {
968    /// // 1
969    /// # assert_eq!(stream.next().await.unwrap(), 1);
970    /// # }));
971    /// ```
972    pub fn min(self) -> Optional<T, L, B>
973    where
974        T: Ord,
975    {
976        self.reduce_commutative(q!(|curr, new| {
977            if new < *curr {
978                *curr = new;
979            }
980        }))
981    }
982
983    /// Computes the number of elements in the stream as a [`Singleton`].
984    ///
985    /// # Example
986    /// ```rust
987    /// # use hydro_lang::*;
988    /// # use futures::StreamExt;
989    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
990    /// let tick = process.tick();
991    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
992    /// let batch = unsafe { numbers.tick_batch(&tick) };
993    /// batch.count().all_ticks()
994    /// # }, |mut stream| async move {
995    /// // 4
996    /// # assert_eq!(stream.next().await.unwrap(), 4);
997    /// # }));
998    /// ```
999    pub fn count(self) -> Singleton<usize, L, B> {
1000        self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1001    }
1002}
1003
1004impl<'a, T, L, B> Stream<T, L, B, TotalOrder>
1005where
1006    L: Location<'a>,
1007{
1008    /// Returns a stream with the current count tupled with each element in the input stream.
1009    ///
1010    /// # Example
1011    /// ```rust
1012    /// # use hydro_lang::*;
1013    /// # use futures::StreamExt;
1014    /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
1015    /// let tick = process.tick();
1016    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1017    /// numbers.enumerate()
1018    /// # }, |mut stream| async move {
1019    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1020    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1021    /// #     assert_eq!(stream.next().await.unwrap(), w);
1022    /// # }
1023    /// # }));
1024    /// ```
1025    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
1026        if L::is_top_level() {
1027            Stream::new(
1028                self.location.clone(),
1029                HydroNode::Persist {
1030                    inner: Box::new(HydroNode::Enumerate {
1031                        is_static: true,
1032                        input: Box::new(HydroNode::Unpersist {
1033                            inner: Box::new(self.ir_node.into_inner()),
1034                            metadata: self.location.new_node_metadata::<T>(),
1035                        }),
1036                        metadata: self.location.new_node_metadata::<(usize, T)>(),
1037                    }),
1038                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1039                },
1040            )
1041        } else {
1042            Stream::new(
1043                self.location.clone(),
1044                HydroNode::Enumerate {
1045                    is_static: false,
1046                    input: Box::new(self.ir_node.into_inner()),
1047                    metadata: self.location.new_node_metadata::<(usize, T)>(),
1048                },
1049            )
1050        }
1051    }
1052
1053    /// Computes the first element in the stream as an [`Optional`], which
1054    /// will be empty until the first element in the input arrives.
1055    ///
1056    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1057    /// re-ordering of elements may cause the first element to change.
1058    ///
1059    /// # Example
1060    /// ```rust
1061    /// # use hydro_lang::*;
1062    /// # use futures::StreamExt;
1063    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1064    /// let tick = process.tick();
1065    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1066    /// let batch = unsafe { numbers.tick_batch(&tick) };
1067    /// batch.first().all_ticks()
1068    /// # }, |mut stream| async move {
1069    /// // 1
1070    /// # assert_eq!(stream.next().await.unwrap(), 1);
1071    /// # }));
1072    /// ```
1073    pub fn first(self) -> Optional<T, L, B> {
1074        Optional::new(self.location, self.ir_node.into_inner())
1075    }
1076
1077    /// Computes the last element in the stream as an [`Optional`], which
1078    /// will be empty until an element in the input arrives.
1079    ///
1080    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1081    /// re-ordering of elements may cause the last element to change.
1082    ///
1083    /// # Example
1084    /// ```rust
1085    /// # use hydro_lang::*;
1086    /// # use futures::StreamExt;
1087    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1088    /// let tick = process.tick();
1089    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1090    /// let batch = unsafe { numbers.tick_batch(&tick) };
1091    /// batch.last().all_ticks()
1092    /// # }, |mut stream| async move {
1093    /// // 4
1094    /// # assert_eq!(stream.next().await.unwrap(), 4);
1095    /// # }));
1096    /// ```
1097    pub fn last(self) -> Optional<T, L, B> {
1098        self.reduce(q!(|curr, new| *curr = new))
1099    }
1100
1101    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1102    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1103    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1104    ///
1105    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1106    /// to depend on the order of elements in the stream.
1107    ///
1108    /// # Example
1109    /// ```rust
1110    /// # use hydro_lang::*;
1111    /// # use futures::StreamExt;
1112    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1113    /// let tick = process.tick();
1114    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1115    /// let batch = unsafe { words.tick_batch(&tick) };
1116    /// batch
1117    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1118    ///     .all_ticks()
1119    /// # }, |mut stream| async move {
1120    /// // "HELLOWORLD"
1121    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1122    /// # }));
1123    /// ```
1124    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1125        self,
1126        init: impl IntoQuotedMut<'a, I, L>,
1127        comb: impl IntoQuotedMut<'a, F, L>,
1128    ) -> Singleton<A, L, B> {
1129        let init = init.splice_fn0_ctx(&self.location).into();
1130        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1131
1132        let mut core = HydroNode::Fold {
1133            init,
1134            acc: comb,
1135            input: Box::new(self.ir_node.into_inner()),
1136            metadata: self.location.new_node_metadata::<A>(),
1137        };
1138
1139        if L::is_top_level() {
1140            // top-level (possibly unbounded) singletons are represented as
1141            // a stream which produces all values from all ticks every tick,
1142            // so Unpersist will always give the lastest aggregation
1143            core = HydroNode::Persist {
1144                inner: Box::new(core),
1145                metadata: self.location.new_node_metadata::<A>(),
1146            };
1147        }
1148
1149        Singleton::new(self.location, core)
1150    }
1151
1152    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1153    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1154    /// until the first element in the input arrives.
1155    ///
1156    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1157    /// to depend on the order of elements in the stream.
1158    ///
1159    /// # Example
1160    /// ```rust
1161    /// # use hydro_lang::*;
1162    /// # use futures::StreamExt;
1163    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1164    /// let tick = process.tick();
1165    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1166    /// let batch = unsafe { words.tick_batch(&tick) };
1167    /// batch
1168    ///     .map(q!(|x| x.to_string()))
1169    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1170    ///     .all_ticks()
1171    /// # }, |mut stream| async move {
1172    /// // "HELLOWORLD"
1173    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1174    /// # }));
1175    /// ```
1176    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1177        self,
1178        comb: impl IntoQuotedMut<'a, F, L>,
1179    ) -> Optional<T, L, B> {
1180        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1181        let mut core = HydroNode::Reduce {
1182            f,
1183            input: Box::new(self.ir_node.into_inner()),
1184            metadata: self.location.new_node_metadata::<T>(),
1185        };
1186
1187        if L::is_top_level() {
1188            core = HydroNode::Persist {
1189                inner: Box::new(core),
1190                metadata: self.location.new_node_metadata::<T>(),
1191            };
1192        }
1193
1194        Optional::new(self.location, core)
1195    }
1196}
1197
1198impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream<T, L, Unbounded, O> {
1199    /// Produces a new stream that interleaves the elements of the two input streams.
1200    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1201    ///
1202    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1203    /// [`Bounded`], you can use [`Stream::chain`] instead.
1204    ///
1205    /// # Example
1206    /// ```rust
1207    /// # use hydro_lang::*;
1208    /// # use futures::StreamExt;
1209    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1210    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1211    /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1212    /// # }, |mut stream| async move {
1213    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1214    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1215    /// #     assert_eq!(stream.next().await.unwrap(), w);
1216    /// # }
1217    /// # }));
1218    /// ```
1219    pub fn union<O2>(self, other: Stream<T, L, Unbounded, O2>) -> Stream<T, L, Unbounded, NoOrder> {
1220        let tick = self.location.tick();
1221        unsafe {
1222            // SAFETY: Because the outputs are unordered,
1223            // we can interleave batches from both streams.
1224            self.tick_batch(&tick)
1225                .assume_ordering::<NoOrder>()
1226                .chain(other.tick_batch(&tick).assume_ordering::<NoOrder>())
1227                .all_ticks()
1228                .assume_ordering()
1229        }
1230    }
1231}
1232
1233impl<'a, T, L, O> Stream<T, L, Bounded, O>
1234where
1235    L: Location<'a>,
1236{
1237    /// Produces a new stream that emits the input elements in sorted order.
1238    ///
1239    /// The input stream can have any ordering guarantee, but the output stream
1240    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1241    /// elements in the input stream are available, so it requires the input stream
1242    /// to be [`Bounded`].
1243    ///
1244    /// # Example
1245    /// ```rust
1246    /// # use hydro_lang::*;
1247    /// # use futures::StreamExt;
1248    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1249    /// let tick = process.tick();
1250    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1251    /// let batch = unsafe { numbers.tick_batch(&tick) };
1252    /// batch.sort().all_ticks()
1253    /// # }, |mut stream| async move {
1254    /// // 1, 2, 3, 4
1255    /// # for w in (1..5) {
1256    /// #     assert_eq!(stream.next().await.unwrap(), w);
1257    /// # }
1258    /// # }));
1259    /// ```
1260    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
1261    where
1262        T: Ord,
1263    {
1264        Stream::new(
1265            self.location.clone(),
1266            HydroNode::Sort {
1267                input: Box::new(self.ir_node.into_inner()),
1268                metadata: self.location.new_node_metadata::<T>(),
1269            },
1270        )
1271    }
1272
1273    /// Produces a new stream that first emits the elements of the `self` stream,
1274    /// and then emits the elements of the `other` stream. The output stream has
1275    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1276    /// [`TotalOrder`] guarantee.
1277    ///
1278    /// Currently, both input streams must be [`Bounded`]. This operator will block
1279    /// on the first stream until all its elements are available. In a future version,
1280    /// we will relax the requirement on the `other` stream.
1281    ///
1282    /// # Example
1283    /// ```rust
1284    /// # use hydro_lang::*;
1285    /// # use futures::StreamExt;
1286    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1287    /// let tick = process.tick();
1288    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1289    /// let batch = unsafe { numbers.tick_batch(&tick) };
1290    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1291    /// # }, |mut stream| async move {
1292    /// // 2, 3, 4, 5, 1, 2, 3, 4
1293    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1294    /// #     assert_eq!(stream.next().await.unwrap(), w);
1295    /// # }
1296    /// # }));
1297    /// ```
1298    pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, O::Min>
1299    where
1300        O: MinOrder<O2>,
1301    {
1302        check_matching_location(&self.location, &other.location);
1303
1304        Stream::new(
1305            self.location.clone(),
1306            HydroNode::Chain {
1307                first: Box::new(self.ir_node.into_inner()),
1308                second: Box::new(other.ir_node.into_inner()),
1309                metadata: self.location.new_node_metadata::<T>(),
1310            },
1311        )
1312    }
1313}
1314
1315impl<'a, K, V1, L, B, O> Stream<(K, V1), L, B, O>
1316where
1317    L: Location<'a>,
1318{
1319    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1320    /// by equi-joining the two streams on the key attribute `K`.
1321    ///
1322    /// # Example
1323    /// ```rust
1324    /// # use hydro_lang::*;
1325    /// # use std::collections::HashSet;
1326    /// # use futures::StreamExt;
1327    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1328    /// let tick = process.tick();
1329    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1330    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1331    /// stream1.join(stream2)
1332    /// # }, |mut stream| async move {
1333    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1334    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1335    /// # stream.map(|i| assert!(expected.contains(&i)));
1336    /// # }));
1337    pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
1338    where
1339        K: Eq + Hash,
1340    {
1341        check_matching_location(&self.location, &n.location);
1342
1343        Stream::new(
1344            self.location.clone(),
1345            HydroNode::Join {
1346                left: Box::new(self.ir_node.into_inner()),
1347                right: Box::new(n.ir_node.into_inner()),
1348                metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1349            },
1350        )
1351    }
1352
1353    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1354    /// computes the anti-join of the items in the input -- i.e. returns
1355    /// unique items in the first input that do not have a matching key
1356    /// in the second input.
1357    ///
1358    /// # Example
1359    /// ```rust
1360    /// # use hydro_lang::*;
1361    /// # use futures::StreamExt;
1362    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1363    /// let tick = process.tick();
1364    /// let stream = unsafe {
1365    ///    process
1366    ///    .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1367    ///    .tick_batch(&tick)
1368    /// };
1369    /// let batch = unsafe {
1370    ///     process
1371    ///         .source_iter(q!(vec![1, 2]))
1372    ///         .tick_batch(&tick)
1373    /// };
1374    /// stream.anti_join(batch).all_ticks()
1375    /// # }, |mut stream| async move {
1376    /// # for w in vec![(3, 'c'), (4, 'd')] {
1377    /// #     assert_eq!(stream.next().await.unwrap(), w);
1378    /// # }
1379    /// # }));
1380    pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, O>
1381    where
1382        K: Eq + Hash,
1383    {
1384        check_matching_location(&self.location, &n.location);
1385
1386        Stream::new(
1387            self.location.clone(),
1388            HydroNode::AntiJoin {
1389                pos: Box::new(self.ir_node.into_inner()),
1390                neg: Box::new(n.ir_node.into_inner()),
1391                metadata: self.location.new_node_metadata::<(K, V1)>(),
1392            },
1393        )
1394    }
1395}
1396
1397impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded>
1398where
1399    K: Eq + Hash,
1400    L: Location<'a>,
1401{
1402    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1403    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1404    /// in the second element are accumulated via the `comb` closure.
1405    ///
1406    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1407    /// to depend on the order of elements in the stream.
1408    ///
1409    /// If the input and output value types are the same and do not require initialization then use
1410    /// [`Stream::reduce_keyed`].
1411    ///
1412    /// # Example
1413    /// ```rust
1414    /// # use hydro_lang::*;
1415    /// # use futures::StreamExt;
1416    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1417    /// let tick = process.tick();
1418    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1419    /// let batch = unsafe { numbers.tick_batch(&tick) };
1420    /// batch
1421    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1422    ///     .all_ticks()
1423    /// # }, |mut stream| async move {
1424    /// // (1, 5), (2, 7)
1425    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1426    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1427    /// # }));
1428    /// ```
1429    pub fn fold_keyed<A, I, F>(
1430        self,
1431        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1432        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1433    ) -> Stream<(K, A), Tick<L>, Bounded>
1434    where
1435        I: Fn() -> A + 'a,
1436        F: Fn(&mut A, V) + 'a,
1437    {
1438        let init = init.splice_fn0_ctx(&self.location).into();
1439        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1440
1441        Stream::new(
1442            self.location.clone(),
1443            HydroNode::FoldKeyed {
1444                init,
1445                acc: comb,
1446                input: Box::new(self.ir_node.into_inner()),
1447                metadata: self.location.new_node_metadata::<(K, A)>(),
1448            },
1449        )
1450    }
1451
1452    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1453    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1454    /// in the second element are accumulated via the `comb` closure.
1455    ///
1456    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1457    /// to depend on the order of elements in the stream.
1458    ///
1459    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1460    ///
1461    /// # Example
1462    /// ```rust
1463    /// # use hydro_lang::*;
1464    /// # use futures::StreamExt;
1465    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1466    /// let tick = process.tick();
1467    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1468    /// let batch = unsafe { numbers.tick_batch(&tick) };
1469    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1470    /// # }, |mut stream| async move {
1471    /// // (1, 5), (2, 7)
1472    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1473    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1474    /// # }));
1475    /// ```
1476    pub fn reduce_keyed<F>(
1477        self,
1478        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1479    ) -> Stream<(K, V), Tick<L>, Bounded>
1480    where
1481        F: Fn(&mut V, V) + 'a,
1482    {
1483        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1484
1485        Stream::new(
1486            self.location.clone(),
1487            HydroNode::ReduceKeyed {
1488                f,
1489                input: Box::new(self.ir_node.into_inner()),
1490                metadata: self.location.new_node_metadata::<(K, V)>(),
1491            },
1492        )
1493    }
1494}
1495
1496impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O>
1497where
1498    K: Eq + Hash,
1499    L: Location<'a>,
1500{
1501    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1502    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1503    /// in the second element are accumulated via the `comb` closure.
1504    ///
1505    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1506    ///
1507    /// If the input and output value types are the same and do not require initialization then use
1508    /// [`Stream::reduce_keyed_commutative`].
1509    ///
1510    /// # Example
1511    /// ```rust
1512    /// # use hydro_lang::*;
1513    /// # use futures::StreamExt;
1514    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1515    /// let tick = process.tick();
1516    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1517    /// let batch = unsafe { numbers.tick_batch(&tick) };
1518    /// batch
1519    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1520    ///     .all_ticks()
1521    /// # }, |mut stream| async move {
1522    /// // (1, 5), (2, 7)
1523    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1524    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1525    /// # }));
1526    /// ```
1527    pub fn fold_keyed_commutative<A, I, F>(
1528        self,
1529        init: impl IntoQuotedMut<'a, I, Tick<L>>,
1530        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1531    ) -> Stream<(K, A), Tick<L>, Bounded, O>
1532    where
1533        I: Fn() -> A + 'a,
1534        F: Fn(&mut A, V) + 'a,
1535    {
1536        let init = init.splice_fn0_ctx(&self.location).into();
1537        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1538
1539        Stream::new(
1540            self.location.clone(),
1541            HydroNode::FoldKeyed {
1542                init,
1543                acc: comb,
1544                input: Box::new(self.ir_node.into_inner()),
1545                metadata: self.location.new_node_metadata::<(K, A)>(),
1546            },
1547        )
1548    }
1549
1550    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1551    /// # Example
1552    /// ```rust
1553    /// # use hydro_lang::*;
1554    /// # use futures::StreamExt;
1555    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1556    /// let tick = process.tick();
1557    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1558    /// let batch = unsafe { numbers.tick_batch(&tick) };
1559    /// batch.keys().all_ticks()
1560    /// # }, |mut stream| async move {
1561    /// // 1, 2
1562    /// # assert_eq!(stream.next().await.unwrap(), 1);
1563    /// # assert_eq!(stream.next().await.unwrap(), 2);
1564    /// # }));
1565    /// ```
1566    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, O> {
1567        self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1568            .map(q!(|(k, _)| k))
1569    }
1570
1571    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1572    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1573    /// in the second element are accumulated via the `comb` closure.
1574    ///
1575    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1576    ///
1577    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1578    ///
1579    /// # Example
1580    /// ```rust
1581    /// # use hydro_lang::*;
1582    /// # use futures::StreamExt;
1583    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1584    /// let tick = process.tick();
1585    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1586    /// let batch = unsafe { numbers.tick_batch(&tick) };
1587    /// batch
1588    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1589    ///     .all_ticks()
1590    /// # }, |mut stream| async move {
1591    /// // (1, 5), (2, 7)
1592    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1593    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1594    /// # }));
1595    /// ```
1596    pub fn reduce_keyed_commutative<F>(
1597        self,
1598        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1599    ) -> Stream<(K, V), Tick<L>, Bounded, O>
1600    where
1601        F: Fn(&mut V, V) + 'a,
1602    {
1603        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1604
1605        Stream::new(
1606            self.location.clone(),
1607            HydroNode::ReduceKeyed {
1608                f,
1609                input: Box::new(self.ir_node.into_inner()),
1610                metadata: self.location.new_node_metadata::<(K, V)>(),
1611            },
1612        )
1613    }
1614}
1615
1616impl<'a, T, L, B, O> Stream<T, Atomic<L>, B, O>
1617where
1618    L: Location<'a> + NoTick,
1619{
1620    /// Returns a stream corresponding to the latest batch of elements being atomically
1621    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1622    /// the order of the input.
1623    ///
1624    /// # Safety
1625    /// The batch boundaries are non-deterministic and may change across executions.
1626    pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, O> {
1627        Stream::new(
1628            self.location.clone().tick,
1629            HydroNode::Unpersist {
1630                inner: Box::new(self.ir_node.into_inner()),
1631                metadata: self.location.new_node_metadata::<T>(),
1632            },
1633        )
1634    }
1635
1636    pub fn end_atomic(self) -> Stream<T, L, B, O> {
1637        Stream::new(self.location.tick.l, self.ir_node.into_inner())
1638    }
1639
1640    pub fn atomic_source(&self) -> Tick<L> {
1641        self.location.tick.clone()
1642    }
1643}
1644
1645impl<'a, T, L, B, O> Stream<T, L, B, O>
1646where
1647    L: Location<'a> + NoTick + NoAtomic,
1648{
1649    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O> {
1650        Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1651    }
1652
1653    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1654    /// Future outputs are produced as available, regardless of input arrival order.
1655    ///
1656    /// # Example
1657    /// ```rust
1658    /// # use std::collections::HashSet;
1659    /// # use futures::StreamExt;
1660    /// # use hydro_lang::*;
1661    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1662    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1663    ///     .map(q!(|x| async move {
1664    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1665    ///         x
1666    ///     }))
1667    ///     .resolve_futures()
1668    /// #   },
1669    /// #   |mut stream| async move {
1670    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
1671    /// #       let mut output = HashSet::new();
1672    /// #       for _ in 1..10 {
1673    /// #           output.insert(stream.next().await.unwrap());
1674    /// #       }
1675    /// #       assert_eq!(
1676    /// #           output,
1677    /// #           HashSet::<i32>::from_iter(1..10)
1678    /// #       );
1679    /// #   },
1680    /// # ));
1681    pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder>
1682    where
1683        T: Future<Output = T2>,
1684    {
1685        Stream::new(
1686            self.location.clone(),
1687            HydroNode::ResolveFutures {
1688                input: Box::new(self.ir_node.into_inner()),
1689                metadata: self.location.new_node_metadata::<T2>(),
1690            },
1691        )
1692    }
1693
1694    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1695    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1696    /// the order of the input.
1697    ///
1698    /// # Safety
1699    /// The batch boundaries are non-deterministic and may change across executions.
1700    pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, O> {
1701        unsafe { self.atomic(tick).tick_batch() }
1702    }
1703
1704    /// Given a time interval, returns a stream corresponding to samples taken from the
1705    /// stream roughly at that interval. The output will have elements in the same order
1706    /// as the input, but with arbitrary elements skipped between samples. There is also
1707    /// no guarantee on the exact timing of the samples.
1708    ///
1709    /// # Safety
1710    /// The output stream is non-deterministic in which elements are sampled, since this
1711    /// is controlled by a clock.
1712    pub unsafe fn sample_every(
1713        self,
1714        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1715    ) -> Stream<T, L, Unbounded, O> {
1716        let samples = unsafe {
1717            // SAFETY: source of intentional non-determinism
1718            self.location.source_interval(interval)
1719        };
1720
1721        let tick = self.location.tick();
1722        unsafe {
1723            // SAFETY: source of intentional non-determinism
1724            self.tick_batch(&tick)
1725                .continue_if(samples.tick_batch(&tick).first())
1726                .all_ticks()
1727        }
1728    }
1729
1730    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1731    /// stream has not emitted a value since that duration.
1732    ///
1733    /// # Safety
1734    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1735    /// samples take place, timeouts may be non-deterministically generated or missed,
1736    /// and the notification of the timeout may be delayed as well. There is also no
1737    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1738    /// detected based on when the next sample is taken.
1739    pub unsafe fn timeout(
1740        self,
1741        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1742    ) -> Optional<(), L, Unbounded>
1743    where
1744        O: MinOrder<NoOrder, Min = NoOrder>,
1745    {
1746        let tick = self.location.tick();
1747
1748        let latest_received = self.fold_commutative(
1749            q!(|| None),
1750            q!(|latest, _| {
1751                *latest = Some(Instant::now());
1752            }),
1753        );
1754
1755        unsafe {
1756            // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1757            latest_received.latest_tick(&tick)
1758        }
1759        .filter_map(q!(move |latest_received| {
1760            if let Some(latest_received) = latest_received {
1761                if Instant::now().duration_since(latest_received) > duration {
1762                    Some(())
1763                } else {
1764                    None
1765                }
1766            } else {
1767                Some(())
1768            }
1769        }))
1770        .latest()
1771    }
1772}
1773
1774impl<'a, F, T, L, B, O> Stream<F, L, B, O>
1775where
1776    L: Location<'a> + NoTick + NoAtomic,
1777    F: Future<Output = T>,
1778{
1779    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1780    /// Future outputs are produced in the same order as the input stream.
1781    ///
1782    /// # Example
1783    /// ```rust
1784    /// # use std::collections::HashSet;
1785    /// # use futures::StreamExt;
1786    /// # use hydro_lang::*;
1787    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1788    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1789    ///     .map(q!(|x| async move {
1790    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1791    ///         x
1792    ///     }))
1793    ///     .resolve_futures_ordered()
1794    /// #   },
1795    /// #   |mut stream| async move {
1796    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
1797    /// #       let mut output = Vec::new();
1798    /// #       for _ in 1..10 {
1799    /// #           output.push(stream.next().await.unwrap());
1800    /// #       }
1801    /// #       assert_eq!(
1802    /// #           output,
1803    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
1804    /// #       );
1805    /// #   },
1806    /// # ));
1807    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O> {
1808        Stream::new(
1809            self.location.clone(),
1810            HydroNode::ResolveFuturesOrdered {
1811                input: Box::new(self.ir_node.into_inner()),
1812                metadata: self.location.new_node_metadata::<T>(),
1813            },
1814        )
1815    }
1816}
1817
1818impl<'a, T, L, B, O> Stream<T, L, B, O>
1819where
1820    L: Location<'a> + NoTick,
1821{
1822    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1823        let f = f.splice_fn1_ctx(&self.location).into();
1824        let metadata = self.location.new_node_metadata::<T>();
1825        self.location
1826            .flow_state()
1827            .borrow_mut()
1828            .leaves
1829            .as_mut()
1830            .expect(FLOW_USED_MESSAGE)
1831            .push(HydroLeaf::ForEach {
1832                input: Box::new(HydroNode::Unpersist {
1833                    inner: Box::new(self.ir_node.into_inner()),
1834                    metadata: metadata.clone(),
1835                }),
1836                f,
1837                metadata,
1838            });
1839    }
1840
1841    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1842    where
1843        S: 'a + futures::Sink<T> + Unpin,
1844    {
1845        self.location
1846            .flow_state()
1847            .borrow_mut()
1848            .leaves
1849            .as_mut()
1850            .expect(FLOW_USED_MESSAGE)
1851            .push(HydroLeaf::DestSink {
1852                sink: sink.splice_typed_ctx(&self.location).into(),
1853                input: Box::new(self.ir_node.into_inner()),
1854                metadata: self.location.new_node_metadata::<T>(),
1855            });
1856    }
1857}
1858
1859impl<'a, T, L, O> Stream<T, Tick<L>, Bounded, O>
1860where
1861    L: Location<'a>,
1862{
1863    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O> {
1864        Stream::new(
1865            self.location.outer().clone(),
1866            HydroNode::Persist {
1867                inner: Box::new(self.ir_node.into_inner()),
1868                metadata: self.location.new_node_metadata::<T>(),
1869            },
1870        )
1871    }
1872
1873    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O> {
1874        Stream::new(
1875            Atomic {
1876                tick: self.location.clone(),
1877            },
1878            HydroNode::Persist {
1879                inner: Box::new(self.ir_node.into_inner()),
1880                metadata: self.location.new_node_metadata::<T>(),
1881            },
1882        )
1883    }
1884
1885    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O>
1886    where
1887        T: Clone,
1888    {
1889        Stream::new(
1890            self.location.clone(),
1891            HydroNode::Persist {
1892                inner: Box::new(self.ir_node.into_inner()),
1893                metadata: self.location.new_node_metadata::<T>(),
1894            },
1895        )
1896    }
1897
1898    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O> {
1899        Stream::new(
1900            self.location.clone(),
1901            HydroNode::DeferTick {
1902                input: Box::new(self.ir_node.into_inner()),
1903                metadata: self.location.new_node_metadata::<T>(),
1904            },
1905        )
1906    }
1907
1908    pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O> {
1909        Stream::new(
1910            self.location.clone(),
1911            HydroNode::Delta {
1912                inner: Box::new(self.ir_node.into_inner()),
1913                metadata: self.location.new_node_metadata::<T>(),
1914            },
1915        )
1916    }
1917}
1918
1919pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
1920    let root = get_this_crate();
1921
1922    if is_demux {
1923        parse_quote! {
1924            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
1925                |(id, data)| {
1926                    (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
1927                }
1928            )
1929        }
1930    } else {
1931        parse_quote! {
1932            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
1933                |data| {
1934                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
1935                }
1936            )
1937        }
1938    }
1939}
1940
1941fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
1942    serialize_bincode_with_type(is_demux, &stageleft::quote_type::<T>())
1943}
1944
1945pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
1946    let root = get_this_crate();
1947
1948    if let Some(c_type) = tagged {
1949        parse_quote! {
1950            |res| {
1951                let (id, b) = res.unwrap();
1952                (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
1953            }
1954        }
1955    } else {
1956        parse_quote! {
1957            |res| {
1958                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
1959            }
1960        }
1961    }
1962}
1963
1964pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
1965    deserialize_bincode_with_type(tagged, &stageleft::quote_type::<T>())
1966}
1967
1968impl<'a, T, L, B, O> Stream<T, L, B, O>
1969where
1970    L: Location<'a> + NoTick,
1971{
1972    pub fn send_bincode<L2, CoreType>(
1973        self,
1974        other: &L2,
1975    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, O::Min>
1976    where
1977        L::Root: CanSend<'a, L2, In<CoreType> = T>,
1978        L2: Location<'a>,
1979        CoreType: Serialize + DeserializeOwned,
1980        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
1981    {
1982        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
1983
1984        let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(
1985            L::Root::tagged_type().as_ref(),
1986        ));
1987
1988        Stream::new(
1989            other.clone(),
1990            HydroNode::Network {
1991                from_key: None,
1992                to_location: other.id(),
1993                to_key: None,
1994                serialize_fn: serialize_pipeline.map(|e| e.into()),
1995                instantiate_fn: DebugInstantiate::Building,
1996                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1997                input: Box::new(self.ir_node.into_inner()),
1998                metadata: other.new_node_metadata::<CoreType>(),
1999            },
2000        )
2001    }
2002
2003    pub fn send_bincode_external<L2, CoreType>(
2004        self,
2005        other: &ExternalProcess<L2>,
2006    ) -> ExternalBincodeStream<L::Out<CoreType>>
2007    where
2008        L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
2009        L2: 'a,
2010        CoreType: Serialize + DeserializeOwned,
2011        // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
2012    {
2013        let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
2014
2015        let metadata = other.new_node_metadata::<CoreType>();
2016
2017        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2018
2019        let external_key = flow_state_borrow.next_external_out;
2020        flow_state_borrow.next_external_out += 1;
2021
2022        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2023
2024        let dummy_f: syn::Expr = syn::parse_quote!(());
2025
2026        leaves.push(HydroLeaf::ForEach {
2027            f: dummy_f.into(),
2028            input: Box::new(HydroNode::Network {
2029                from_key: None,
2030                to_location: other.id(),
2031                to_key: Some(external_key),
2032                serialize_fn: serialize_pipeline.map(|e| e.into()),
2033                instantiate_fn: DebugInstantiate::Building,
2034                deserialize_fn: None,
2035                input: Box::new(self.ir_node.into_inner()),
2036                metadata: metadata.clone(),
2037            }),
2038            metadata,
2039        });
2040
2041        ExternalBincodeStream {
2042            process_id: other.id,
2043            port_id: external_key,
2044            _phantom: PhantomData,
2045        }
2046    }
2047
2048    pub fn send_bytes<L2>(
2049        self,
2050        other: &L2,
2051    ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, O::Min>
2052    where
2053        L2: Location<'a>,
2054        L::Root: CanSend<'a, L2, In<Bytes> = T>,
2055        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2056    {
2057        let root = get_this_crate();
2058        Stream::new(
2059            other.clone(),
2060            HydroNode::Network {
2061                from_key: None,
2062                to_location: other.id(),
2063                to_key: None,
2064                serialize_fn: None,
2065                instantiate_fn: DebugInstantiate::Building,
2066                deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
2067                    let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
2068                    Some(expr.into())
2069                } else {
2070                    let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
2071                    Some(expr.into())
2072                },
2073                input: Box::new(self.ir_node.into_inner()),
2074                metadata: other.new_node_metadata::<Bytes>(),
2075            },
2076        )
2077    }
2078
2079    pub fn send_bytes_external<L2>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
2080    where
2081        L2: 'a,
2082        L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
2083    {
2084        let metadata = other.new_node_metadata::<Bytes>();
2085
2086        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2087        let external_key = flow_state_borrow.next_external_out;
2088        flow_state_borrow.next_external_out += 1;
2089
2090        let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2091
2092        let dummy_f: syn::Expr = syn::parse_quote!(());
2093
2094        leaves.push(HydroLeaf::ForEach {
2095            f: dummy_f.into(),
2096            input: Box::new(HydroNode::Network {
2097                from_key: None,
2098                to_location: other.id(),
2099                to_key: Some(external_key),
2100                serialize_fn: None,
2101                instantiate_fn: DebugInstantiate::Building,
2102                deserialize_fn: None,
2103                input: Box::new(self.ir_node.into_inner()),
2104                metadata: metadata.clone(),
2105            }),
2106            metadata,
2107        });
2108
2109        ExternalBytesPort {
2110            process_id: other.id,
2111            port_id: external_key,
2112        }
2113    }
2114
2115    pub fn send_bincode_anonymous<L2, Tag, CoreType>(
2116        self,
2117        other: &L2,
2118    ) -> Stream<CoreType, L2, Unbounded, O::Min>
2119    where
2120        L2: Location<'a>,
2121        L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
2122        CoreType: Serialize + DeserializeOwned,
2123        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2124    {
2125        self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
2126    }
2127
2128    pub fn send_bytes_anonymous<L2, Tag>(self, other: &L2) -> Stream<Bytes, L2, Unbounded, O::Min>
2129    where
2130        L2: Location<'a>,
2131        L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
2132        O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2133    {
2134        self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
2135    }
2136
2137    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2138    pub fn broadcast_bincode<C2>(
2139        self,
2140        other: &Cluster<'a, C2>,
2141    ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, O::Min>
2142    where
2143        C2: 'a,
2144        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2145        T: Clone + Serialize + DeserializeOwned,
2146        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2147    {
2148        let ids = other.members();
2149
2150        let to_send: Stream<(u32, Bytes), L, B, O> = self
2151            .map::<Bytes, _>(q!(|v| bincode::serialize(&v).unwrap().into()))
2152            .flat_map_ordered(q!(|v| { ids.iter().map(move |id| (id.raw_id, v.clone())) }));
2153
2154        let deserialize_pipeline = Some(deserialize_bincode::<T>(L::Root::tagged_type().as_ref()));
2155
2156        Stream::new(
2157            other.clone(),
2158            HydroNode::Network {
2159                from_key: None,
2160                to_location: other.id(),
2161                to_key: None,
2162                serialize_fn: None,
2163                instantiate_fn: DebugInstantiate::Building,
2164                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2165                input: Box::new(to_send.ir_node.into_inner()),
2166                metadata: other.new_node_metadata::<T>(),
2167            },
2168        )
2169    }
2170
2171    pub fn broadcast_bincode_anonymous<C2, Tag>(
2172        self,
2173        other: &Cluster<'a, C2>,
2174    ) -> Stream<T, Cluster<'a, C2>, Unbounded, O::Min>
2175    where
2176        C2: 'a,
2177        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
2178        T: Clone + Serialize + DeserializeOwned,
2179        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2180    {
2181        self.broadcast_bincode(other).map(q!(|(_, b)| b))
2182    }
2183
2184    #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2185    pub fn broadcast_bytes<C2>(
2186        self,
2187        other: &Cluster<'a, C2>,
2188    ) -> Stream<
2189        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2190        Cluster<'a, C2>,
2191        Unbounded,
2192        O::Min,
2193    >
2194    where
2195        C2: 'a,
2196        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)>,
2197        T: Clone,
2198        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2199    {
2200        let ids = other.members();
2201
2202        self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2203            ::std::clone::Clone::clone(id),
2204            ::std::clone::Clone::clone(&b)
2205        ))))
2206        .send_bytes(other)
2207    }
2208
2209    pub fn broadcast_bytes_anonymous<C2, Tag>(
2210        self,
2211        other: &Cluster<'a, C2>,
2212    ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, O::Min>
2213    where
2214        C2: 'a,
2215        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2216            + 'a,
2217        T: Clone,
2218        O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2219    {
2220        self.broadcast_bytes(other).map(q!(|(_, b)| b))
2221    }
2222}
2223
2224#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2225impl<'a, T, L, B> Stream<T, L, B, TotalOrder>
2226where
2227    L: Location<'a> + NoTick,
2228{
2229    pub fn round_robin_bincode<C2>(
2230        self,
2231        other: &Cluster<'a, C2>,
2232    ) -> Stream<
2233        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2234        Cluster<'a, C2>,
2235        Unbounded,
2236        <TotalOrder as MinOrder<
2237            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2238        >>::Min,
2239    >
2240    where
2241        C2: 'a,
2242        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2243        T: Clone + Serialize + DeserializeOwned,
2244        TotalOrder:
2245            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2246    {
2247        let ids = other.members();
2248
2249        self.enumerate()
2250            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2251            .send_bincode(other)
2252    }
2253
2254    pub fn round_robin_bincode_anonymous<C2, Tag>(
2255        self,
2256        other: &Cluster<'a, C2>,
2257    ) -> Stream<
2258        T,
2259        Cluster<'a, C2>,
2260        Unbounded,
2261        <TotalOrder as MinOrder<
2262            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2263        >>::Min,
2264    >
2265    where
2266        C2: 'a,
2267        L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2268        T: Clone + Serialize + DeserializeOwned,
2269        TotalOrder:
2270            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2271    {
2272        self.round_robin_bincode(other).map(q!(|(_, b)| b))
2273    }
2274
2275    pub fn round_robin_bytes<C2>(
2276        self,
2277        other: &Cluster<'a, C2>,
2278    ) -> Stream<
2279        <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2280        Cluster<'a, C2>,
2281        Unbounded,
2282        <TotalOrder as MinOrder<
2283            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2284        >>::Min,
2285    >
2286    where
2287        C2: 'a,
2288        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2289        T: Clone,
2290        TotalOrder:
2291            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2292    {
2293        let ids = other.members();
2294
2295        self.enumerate()
2296            .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2297            .send_bytes(other)
2298    }
2299
2300    pub fn round_robin_bytes_anonymous<C2, Tag>(
2301        self,
2302        other: &Cluster<'a, C2>,
2303    ) -> Stream<
2304        Bytes,
2305        Cluster<'a, C2>,
2306        Unbounded,
2307        <TotalOrder as MinOrder<
2308            <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2309        >>::Min,
2310    >
2311    where
2312        C2: 'a,
2313        L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2314            + 'a,
2315        T: Clone,
2316        TotalOrder:
2317            MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2318    {
2319        self.round_robin_bytes(other).map(q!(|(_, b)| b))
2320    }
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325    use futures::StreamExt;
2326    use hydro_deploy::Deployment;
2327    use serde::{Deserialize, Serialize};
2328    use stageleft::q;
2329
2330    use crate::FlowBuilder;
2331    use crate::location::Location;
2332
2333    struct P1 {}
2334    struct P2 {}
2335
2336    #[derive(Serialize, Deserialize, Debug)]
2337    struct SendOverNetwork {
2338        n: u32,
2339    }
2340
2341    #[tokio::test]
2342    async fn first_ten_distributed() {
2343        let mut deployment = Deployment::new();
2344
2345        let flow = FlowBuilder::new();
2346        let first_node = flow.process::<P1>();
2347        let second_node = flow.process::<P2>();
2348        let external = flow.external_process::<P2>();
2349
2350        let numbers = first_node.source_iter(q!(0..10));
2351        let out_port = numbers
2352            .map(q!(|n| SendOverNetwork { n }))
2353            .send_bincode(&second_node)
2354            .send_bincode_external(&external);
2355
2356        let nodes = flow
2357            .with_process(&first_node, deployment.Localhost())
2358            .with_process(&second_node, deployment.Localhost())
2359            .with_external(&external, deployment.Localhost())
2360            .deploy(&mut deployment);
2361
2362        deployment.deploy().await.unwrap();
2363
2364        let mut external_out = nodes.connect_source_bincode(out_port).await;
2365
2366        deployment.start().await.unwrap();
2367
2368        for i in 0..10 {
2369            assert_eq!(external_out.next().await.unwrap().n, i);
2370        }
2371    }
2372}