hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21 CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub struct TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub struct NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41 /// The weaker of the two orderings.
42 type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47 type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52 type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57 type Min = NoOrder;
58}
59
60/// An ordered sequence stream of elements of type `T`.
61///
62/// Type Parameters:
63/// - `Type`: the type of elements in the stream
64/// - `Loc`: the location where the stream is being materialized
65/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
66/// or [`Unbounded`]
67/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
68/// or [`NoOrder`] (default is [`TotalOrder`])
69pub struct Stream<Type, Loc, Bound, Order = TotalOrder> {
70 location: Loc,
71 pub(crate) ir_node: RefCell<HydroNode>,
72
73 _phantom: PhantomData<(Type, Loc, Bound, Order)>,
74}
75
76impl<'a, T, L, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O>
77where
78 L: Location<'a>,
79{
80 fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
81 Stream {
82 location: stream.location,
83 ir_node: stream.ir_node,
84 _phantom: PhantomData,
85 }
86 }
87}
88
89impl<'a, T, L, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder>
90where
91 L: Location<'a>,
92{
93 fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
94 Stream {
95 location: stream.location,
96 ir_node: stream.ir_node,
97 _phantom: PhantomData,
98 }
99 }
100}
101
102impl<'a, T, L, B, O> Stream<T, L, B, O>
103where
104 L: Location<'a>,
105{
106 fn location_kind(&self) -> LocationId {
107 self.location.id()
108 }
109}
110
111impl<'a, T, L, O> DeferTick for Stream<T, Tick<L>, Bounded, O>
112where
113 L: Location<'a>,
114{
115 fn defer_tick(self) -> Self {
116 Stream::defer_tick(self)
117 }
118}
119
120impl<'a, T, L, O> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O>
121where
122 L: Location<'a>,
123{
124 type Location = Tick<L>;
125
126 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
127 let location_id = location.id();
128 Stream::new(
129 location.clone(),
130 HydroNode::CycleSource {
131 ident,
132 location_kind: location_id,
133 metadata: location.new_node_metadata::<T>(),
134 },
135 )
136 }
137}
138
139impl<'a, T, L, O> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O>
140where
141 L: Location<'a>,
142{
143 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
144 assert_eq!(
145 self.location.id(),
146 expected_location,
147 "locations do not match"
148 );
149 self.location
150 .flow_state()
151 .borrow_mut()
152 .leaves
153 .as_mut()
154 .expect(FLOW_USED_MESSAGE)
155 .push(HydroLeaf::CycleSink {
156 ident,
157 location_kind: self.location_kind(),
158 input: Box::new(self.ir_node.into_inner()),
159 metadata: self.location.new_node_metadata::<T>(),
160 });
161 }
162}
163
164impl<'a, T, L, B, O> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O>
165where
166 L: Location<'a> + NoTick,
167{
168 type Location = L;
169
170 fn create_source(ident: syn::Ident, location: L) -> Self {
171 let location_id = location.id();
172
173 Stream::new(
174 location.clone(),
175 HydroNode::Persist {
176 inner: Box::new(HydroNode::CycleSource {
177 ident,
178 location_kind: location_id,
179 metadata: location.new_node_metadata::<T>(),
180 }),
181 metadata: location.new_node_metadata::<T>(),
182 },
183 )
184 }
185}
186
187impl<'a, T, L, B, O> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O>
188where
189 L: Location<'a> + NoTick,
190{
191 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
192 assert_eq!(
193 self.location.id(),
194 expected_location,
195 "locations do not match"
196 );
197 let metadata = self.location.new_node_metadata::<T>();
198 self.location
199 .flow_state()
200 .borrow_mut()
201 .leaves
202 .as_mut()
203 .expect(FLOW_USED_MESSAGE)
204 .push(HydroLeaf::CycleSink {
205 ident,
206 location_kind: self.location_kind(),
207 input: Box::new(HydroNode::Unpersist {
208 inner: Box::new(self.ir_node.into_inner()),
209 metadata: metadata.clone(),
210 }),
211 metadata,
212 });
213 }
214}
215
216impl<'a, T, L, B, O> Stream<T, L, B, O>
217where
218 L: Location<'a>,
219{
220 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
221 Stream {
222 location,
223 ir_node: RefCell::new(ir_node),
224 _phantom: PhantomData,
225 }
226 }
227}
228
229impl<'a, T, L, B, O> Clone for Stream<T, L, B, O>
230where
231 T: Clone,
232 L: Location<'a>,
233{
234 fn clone(&self) -> Self {
235 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
236 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
237 *self.ir_node.borrow_mut() = HydroNode::Tee {
238 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
239 metadata: self.location.new_node_metadata::<T>(),
240 };
241 }
242
243 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
244 Stream {
245 location: self.location.clone(),
246 ir_node: HydroNode::Tee {
247 inner: TeeNode(inner.0.clone()),
248 metadata: metadata.clone(),
249 }
250 .into(),
251 _phantom: PhantomData,
252 }
253 } else {
254 unreachable!()
255 }
256 }
257}
258
259impl<'a, T, L, B, O> Stream<T, L, B, O>
260where
261 L: Location<'a>,
262{
263 /// Produces a stream based on invoking `f` on each element in order.
264 /// If you do not want to modify the stream and instead only want to view
265 /// each item use [`Stream::inspect`] instead.
266 ///
267 /// # Example
268 /// ```rust
269 /// # use hydro_lang::*;
270 /// # use futures::StreamExt;
271 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
272 /// let words = process.source_iter(q!(vec!["hello", "world"]));
273 /// words.map(q!(|x| x.to_uppercase()))
274 /// # }, |mut stream| async move {
275 /// # for w in vec!["HELLO", "WORLD"] {
276 /// # assert_eq!(stream.next().await.unwrap(), w);
277 /// # }
278 /// # }));
279 /// ```
280 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
281 where
282 F: Fn(T) -> U + 'a,
283 {
284 let f = f.splice_fn1_ctx(&self.location).into();
285 Stream::new(
286 self.location.clone(),
287 HydroNode::Map {
288 f,
289 input: Box::new(self.ir_node.into_inner()),
290 metadata: self.location.new_node_metadata::<U>(),
291 },
292 )
293 }
294
295 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
296 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
297 /// for the output type `U` must produce items in a **deterministic** order.
298 ///
299 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
300 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
301 ///
302 /// # Example
303 /// ```rust
304 /// # use hydro_lang::*;
305 /// # use futures::StreamExt;
306 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
307 /// process
308 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
309 /// .flat_map_ordered(q!(|x| x))
310 /// # }, |mut stream| async move {
311 /// // 1, 2, 3, 4
312 /// # for w in (1..5) {
313 /// # assert_eq!(stream.next().await.unwrap(), w);
314 /// # }
315 /// # }));
316 /// ```
317 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
318 where
319 I: IntoIterator<Item = U>,
320 F: Fn(T) -> I + 'a,
321 {
322 let f = f.splice_fn1_ctx(&self.location).into();
323 Stream::new(
324 self.location.clone(),
325 HydroNode::FlatMap {
326 f,
327 input: Box::new(self.ir_node.into_inner()),
328 metadata: self.location.new_node_metadata::<U>(),
329 },
330 )
331 }
332
333 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
334 /// for the output type `U` to produce items in any order.
335 ///
336 /// # Example
337 /// ```rust
338 /// # use hydro_lang::*;
339 /// # use futures::StreamExt;
340 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
341 /// process
342 /// .source_iter(q!(vec![
343 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
344 /// std::collections::HashSet::from_iter(vec![3, 4]),
345 /// ]))
346 /// .flat_map_unordered(q!(|x| x))
347 /// # }, |mut stream| async move {
348 /// // 1, 2, 3, 4, but in no particular order
349 /// # let mut results = Vec::new();
350 /// # for w in (1..5) {
351 /// # results.push(stream.next().await.unwrap());
352 /// # }
353 /// # results.sort();
354 /// # assert_eq!(results, vec![1, 2, 3, 4]);
355 /// # }));
356 /// ```
357 pub fn flat_map_unordered<U, I, F>(
358 self,
359 f: impl IntoQuotedMut<'a, F, L>,
360 ) -> Stream<U, L, B, NoOrder>
361 where
362 I: IntoIterator<Item = U>,
363 F: Fn(T) -> I + 'a,
364 {
365 let f = f.splice_fn1_ctx(&self.location).into();
366 Stream::new(
367 self.location.clone(),
368 HydroNode::FlatMap {
369 f,
370 input: Box::new(self.ir_node.into_inner()),
371 metadata: self.location.new_node_metadata::<U>(),
372 },
373 )
374 }
375
376 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
377 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
378 ///
379 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
380 /// not deterministic, use [`Stream::flatten_unordered`] instead.
381 ///
382 /// ```rust
383 /// # use hydro_lang::*;
384 /// # use futures::StreamExt;
385 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
386 /// process
387 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
388 /// .flatten_ordered()
389 /// # }, |mut stream| async move {
390 /// // 1, 2, 3, 4
391 /// # for w in (1..5) {
392 /// # assert_eq!(stream.next().await.unwrap(), w);
393 /// # }
394 /// # }));
395 /// ```
396 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O>
397 where
398 T: IntoIterator<Item = U>,
399 {
400 self.flat_map_ordered(q!(|d| d))
401 }
402
403 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
404 /// for the element type `T` to produce items in any order.
405 ///
406 /// # Example
407 /// ```rust
408 /// # use hydro_lang::*;
409 /// # use futures::StreamExt;
410 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
411 /// process
412 /// .source_iter(q!(vec![
413 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
414 /// std::collections::HashSet::from_iter(vec![3, 4]),
415 /// ]))
416 /// .flatten_unordered()
417 /// # }, |mut stream| async move {
418 /// // 1, 2, 3, 4, but in no particular order
419 /// # let mut results = Vec::new();
420 /// # for w in (1..5) {
421 /// # results.push(stream.next().await.unwrap());
422 /// # }
423 /// # results.sort();
424 /// # assert_eq!(results, vec![1, 2, 3, 4]);
425 /// # }));
426 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
427 where
428 T: IntoIterator<Item = U>,
429 {
430 self.flat_map_unordered(q!(|d| d))
431 }
432
433 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
434 /// `f`, preserving the order of the elements.
435 ///
436 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
437 /// not modify or take ownership of the values. If you need to modify the values while filtering
438 /// use [`Stream::filter_map`] instead.
439 ///
440 /// # Example
441 /// ```rust
442 /// # use hydro_lang::*;
443 /// # use futures::StreamExt;
444 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
445 /// process
446 /// .source_iter(q!(vec![1, 2, 3, 4]))
447 /// .filter(q!(|&x| x > 2))
448 /// # }, |mut stream| async move {
449 /// // 3, 4
450 /// # for w in (3..5) {
451 /// # assert_eq!(stream.next().await.unwrap(), w);
452 /// # }
453 /// # }));
454 /// ```
455 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O>
456 where
457 F: Fn(&T) -> bool + 'a,
458 {
459 let f = f.splice_fn1_borrow_ctx(&self.location).into();
460 Stream::new(
461 self.location.clone(),
462 HydroNode::Filter {
463 f,
464 input: Box::new(self.ir_node.into_inner()),
465 metadata: self.location.new_node_metadata::<T>(),
466 },
467 )
468 }
469
470 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
471 ///
472 /// # Example
473 /// ```rust
474 /// # use hydro_lang::*;
475 /// # use futures::StreamExt;
476 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
477 /// process
478 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
479 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
480 /// # }, |mut stream| async move {
481 /// // 1, 2
482 /// # for w in (1..3) {
483 /// # assert_eq!(stream.next().await.unwrap(), w);
484 /// # }
485 /// # }));
486 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O>
487 where
488 F: Fn(T) -> Option<U> + 'a,
489 {
490 let f = f.splice_fn1_ctx(&self.location).into();
491 Stream::new(
492 self.location.clone(),
493 HydroNode::FilterMap {
494 f,
495 input: Box::new(self.ir_node.into_inner()),
496 metadata: self.location.new_node_metadata::<U>(),
497 },
498 )
499 }
500
501 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
502 /// where `x` is the final value of `other`, a bounded [`Singleton`].
503 ///
504 /// # Example
505 /// ```rust
506 /// # use hydro_lang::*;
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
509 /// let tick = process.tick();
510 /// let batch = unsafe {
511 /// process
512 /// .source_iter(q!(vec![1, 2, 3, 4]))
513 /// .tick_batch(&tick)
514 /// };
515 /// let count = batch.clone().count(); // `count()` returns a singleton
516 /// batch.cross_singleton(count).all_ticks()
517 /// # }, |mut stream| async move {
518 /// // (1, 4), (2, 4), (3, 4), (4, 4)
519 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
520 /// # assert_eq!(stream.next().await.unwrap(), w);
521 /// # }
522 /// # }));
523 pub fn cross_singleton<O2>(
524 self,
525 other: impl Into<Optional<O2, L, Bounded>>,
526 ) -> Stream<(T, O2), L, B, O>
527 where
528 O2: Clone,
529 {
530 let other: Optional<O2, L, Bounded> = other.into();
531 check_matching_location(&self.location, &other.location);
532
533 Stream::new(
534 self.location.clone(),
535 HydroNode::CrossSingleton {
536 left: Box::new(self.ir_node.into_inner()),
537 right: Box::new(other.ir_node.into_inner()),
538 metadata: self.location.new_node_metadata::<(T, O2)>(),
539 },
540 )
541 }
542
543 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
544 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O> {
545 self.cross_singleton(signal.map(q!(|_u| ())))
546 .map(q!(|(d, _signal)| d))
547 }
548
549 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
550 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O> {
551 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
552 }
553
554 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
555 /// tupled pairs in a non-deterministic order.
556 ///
557 /// # Example
558 /// ```rust
559 /// # use hydro_lang::*;
560 /// # use std::collections::HashSet;
561 /// # use futures::StreamExt;
562 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
563 /// let tick = process.tick();
564 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
565 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
566 /// stream1.cross_product(stream2)
567 /// # }, |mut stream| async move {
568 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
569 /// # stream.map(|i| assert!(expected.contains(&i)));
570 /// # }));
571 pub fn cross_product<O2>(self, other: Stream<O2, L, B, O>) -> Stream<(T, O2), L, B, NoOrder>
572 where
573 T: Clone,
574 O2: Clone,
575 {
576 check_matching_location(&self.location, &other.location);
577
578 Stream::new(
579 self.location.clone(),
580 HydroNode::CrossProduct {
581 left: Box::new(self.ir_node.into_inner()),
582 right: Box::new(other.ir_node.into_inner()),
583 metadata: self.location.new_node_metadata::<(T, O2)>(),
584 },
585 )
586 }
587
588 /// Takes one stream as input and filters out any duplicate occurrences. The output
589 /// contains all unique values from the input.
590 ///
591 /// # Example
592 /// ```rust
593 /// # use hydro_lang::*;
594 /// # use futures::StreamExt;
595 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
596 /// let tick = process.tick();
597 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
598 /// # }, |mut stream| async move {
599 /// # for w in vec![1, 2, 3, 4] {
600 /// # assert_eq!(stream.next().await.unwrap(), w);
601 /// # }
602 /// # }));
603 pub fn unique(self) -> Stream<T, L, B, O>
604 where
605 T: Eq + Hash,
606 {
607 Stream::new(
608 self.location.clone(),
609 HydroNode::Unique {
610 input: Box::new(self.ir_node.into_inner()),
611 metadata: self.location.new_node_metadata::<T>(),
612 },
613 )
614 }
615
616 /// Outputs everything in this stream that is *not* contained in the `other` stream.
617 ///
618 /// The `other` stream must be [`Bounded`], since this function will wait until
619 /// all its elements are available before producing any output.
620 /// # Example
621 /// ```rust
622 /// # use hydro_lang::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
625 /// let tick = process.tick();
626 /// let stream = unsafe {
627 /// process
628 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
629 /// .tick_batch(&tick)
630 /// };
631 /// let batch = unsafe {
632 /// process
633 /// .source_iter(q!(vec![1, 2]))
634 /// .tick_batch(&tick)
635 /// };
636 /// stream.filter_not_in(batch).all_ticks()
637 /// # }, |mut stream| async move {
638 /// # for w in vec![3, 4] {
639 /// # assert_eq!(stream.next().await.unwrap(), w);
640 /// # }
641 /// # }));
642 pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, O>
643 where
644 T: Eq + Hash,
645 {
646 check_matching_location(&self.location, &other.location);
647
648 Stream::new(
649 self.location.clone(),
650 HydroNode::Difference {
651 pos: Box::new(self.ir_node.into_inner()),
652 neg: Box::new(other.ir_node.into_inner()),
653 metadata: self.location.new_node_metadata::<T>(),
654 },
655 )
656 }
657
658 /// An operator which allows you to "inspect" each element of a stream without
659 /// modifying it. The closure `f` is called on a reference to each item. This is
660 /// mainly useful for debugging, and should not be used to generate side-effects.
661 ///
662 /// # Example
663 /// ```rust
664 /// # use hydro_lang::*;
665 /// # use futures::StreamExt;
666 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
667 /// let nums = process.source_iter(q!(vec![1, 2]));
668 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
669 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
670 /// # }, |mut stream| async move {
671 /// # for w in vec![1, 2] {
672 /// # assert_eq!(stream.next().await.unwrap(), w);
673 /// # }
674 /// # }));
675 /// ```
676 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O>
677 where
678 F: Fn(&T) + 'a,
679 {
680 let f = f.splice_fn1_borrow_ctx(&self.location).into();
681
682 if L::is_top_level() {
683 Stream::new(
684 self.location.clone(),
685 HydroNode::Persist {
686 inner: Box::new(HydroNode::Inspect {
687 f,
688 input: Box::new(HydroNode::Unpersist {
689 inner: Box::new(self.ir_node.into_inner()),
690 metadata: self.location.new_node_metadata::<T>(),
691 }),
692 metadata: self.location.new_node_metadata::<T>(),
693 }),
694 metadata: self.location.new_node_metadata::<T>(),
695 },
696 )
697 } else {
698 Stream::new(
699 self.location.clone(),
700 HydroNode::Inspect {
701 f,
702 input: Box::new(self.ir_node.into_inner()),
703 metadata: self.location.new_node_metadata::<T>(),
704 },
705 )
706 }
707 }
708
709 /// Explicitly "casts" the stream to a type with a different ordering
710 /// guarantee. Useful in unsafe code where the ordering cannot be proven
711 /// by the type-system.
712 ///
713 /// # Safety
714 /// This function is used as an escape hatch, and any mistakes in the
715 /// provided ordering guarantee will propagate into the guarantees
716 /// for the rest of the program.
717 ///
718 /// # Example
719 /// # TODO: more sensible code after Shadaj merges
720 /// ```rust
721 /// # use hydro_lang::*;
722 /// # use std::collections::HashSet;
723 /// # use futures::StreamExt;
724 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
725 /// let nums = process.source_iter(q!({
726 /// let now = std::time::SystemTime::now();
727 /// match now.elapsed().unwrap().as_secs() % 2 {
728 /// 0 => vec![5, 4, 3, 2, 1],
729 /// _ => vec![1, 2, 3, 4, 5],
730 /// }
731 /// .into_iter()
732 /// }));
733 /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
734 /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
735 /// stream
736 /// # }, |mut stream| async move {
737 /// # for w in vec![1, 2, 3, 4, 5] {
738 /// # assert!((1..=5).contains(&stream.next().await.unwrap()));
739 /// # }
740 /// # }));
741 /// ```
742 pub unsafe fn assume_ordering<O2>(self) -> Stream<T, L, B, O2> {
743 Stream::new(self.location, self.ir_node.into_inner())
744 }
745}
746
747impl<'a, T, L, B, O> Stream<&T, L, B, O>
748where
749 L: Location<'a>,
750{
751 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
752 ///
753 /// # Example
754 /// ```rust
755 /// # use hydro_lang::*;
756 /// # use futures::StreamExt;
757 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
758 /// process.source_iter(q!(&[1, 2, 3])).cloned()
759 /// # }, |mut stream| async move {
760 /// // 1, 2, 3
761 /// # for w in vec![1, 2, 3] {
762 /// # assert_eq!(stream.next().await.unwrap(), w);
763 /// # }
764 /// # }));
765 /// ```
766 pub fn cloned(self) -> Stream<T, L, B, O>
767 where
768 T: Clone,
769 {
770 self.map(q!(|d| d.clone()))
771 }
772}
773
774impl<'a, T, L, B, O> Stream<T, L, B, O>
775where
776 L: Location<'a>,
777 O: MinOrder<NoOrder, Min = NoOrder>,
778{
779 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
780 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
781 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
782 ///
783 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
784 ///
785 /// # Example
786 /// ```rust
787 /// # use hydro_lang::*;
788 /// # use futures::StreamExt;
789 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
790 /// let tick = process.tick();
791 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
792 /// let batch = unsafe { numbers.tick_batch(&tick) };
793 /// batch
794 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
795 /// .all_ticks()
796 /// # }, |mut stream| async move {
797 /// // 10
798 /// # assert_eq!(stream.next().await.unwrap(), 10);
799 /// # }));
800 /// ```
801 pub fn fold_commutative<A, I, F>(
802 self,
803 init: impl IntoQuotedMut<'a, I, L>,
804 comb: impl IntoQuotedMut<'a, F, L>,
805 ) -> Singleton<A, L, B>
806 where
807 I: Fn() -> A + 'a,
808 F: Fn(&mut A, T),
809 {
810 let init = init.splice_fn0_ctx(&self.location).into();
811 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
812
813 let mut core = HydroNode::Fold {
814 init,
815 acc: comb,
816 input: Box::new(self.ir_node.into_inner()),
817 metadata: self.location.new_node_metadata::<A>(),
818 };
819
820 if L::is_top_level() {
821 // top-level (possibly unbounded) singletons are represented as
822 // a stream which produces all values from all ticks every tick,
823 // so Unpersist will always give the lastest aggregation
824 core = HydroNode::Persist {
825 inner: Box::new(core),
826 metadata: self.location.new_node_metadata::<A>(),
827 };
828 }
829
830 Singleton::new(self.location, core)
831 }
832
833 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
834 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
835 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
836 /// reference, so that it can be modified in place.
837 ///
838 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
839 ///
840 /// # Example
841 /// ```rust
842 /// # use hydro_lang::*;
843 /// # use futures::StreamExt;
844 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
845 /// let tick = process.tick();
846 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
847 /// let batch = unsafe { numbers.tick_batch(&tick) };
848 /// batch
849 /// .reduce_commutative(q!(|curr, new| *curr += new))
850 /// .all_ticks()
851 /// # }, |mut stream| async move {
852 /// // 10
853 /// # assert_eq!(stream.next().await.unwrap(), 10);
854 /// # }));
855 /// ```
856 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
857 where
858 F: Fn(&mut T, T) + 'a,
859 {
860 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
861 let mut core = HydroNode::Reduce {
862 f,
863 input: Box::new(self.ir_node.into_inner()),
864 metadata: self.location.new_node_metadata::<T>(),
865 };
866
867 if L::is_top_level() {
868 core = HydroNode::Persist {
869 inner: Box::new(core),
870 metadata: self.location.new_node_metadata::<T>(),
871 };
872 }
873
874 Optional::new(self.location, core)
875 }
876
877 /// Computes the maximum element in the stream as an [`Optional`], which
878 /// will be empty until the first element in the input arrives.
879 ///
880 /// # Example
881 /// ```rust
882 /// # use hydro_lang::*;
883 /// # use futures::StreamExt;
884 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
885 /// let tick = process.tick();
886 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
887 /// let batch = unsafe { numbers.tick_batch(&tick) };
888 /// batch.max().all_ticks()
889 /// # }, |mut stream| async move {
890 /// // 4
891 /// # assert_eq!(stream.next().await.unwrap(), 4);
892 /// # }));
893 /// ```
894 pub fn max(self) -> Optional<T, L, B>
895 where
896 T: Ord,
897 {
898 self.reduce_commutative(q!(|curr, new| {
899 if new > *curr {
900 *curr = new;
901 }
902 }))
903 }
904
905 /// Computes the maximum element in the stream as an [`Optional`], where the
906 /// maximum is determined according to the `key` function. The [`Optional`] will
907 /// be empty until the first element in the input arrives.
908 ///
909 /// # Example
910 /// ```rust
911 /// # use hydro_lang::*;
912 /// # use futures::StreamExt;
913 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
914 /// let tick = process.tick();
915 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
916 /// let batch = unsafe { numbers.tick_batch(&tick) };
917 /// batch.max_by_key(q!(|x| -x)).all_ticks()
918 /// # }, |mut stream| async move {
919 /// // 1
920 /// # assert_eq!(stream.next().await.unwrap(), 1);
921 /// # }));
922 /// ```
923 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
924 where
925 K: Ord,
926 F: Fn(&T) -> K + 'a,
927 {
928 let f = key.splice_fn1_borrow_ctx(&self.location);
929
930 let wrapped: syn::Expr = parse_quote!({
931 let key_fn = #f;
932 move |curr, new| {
933 if key_fn(&new) > key_fn(&*curr) {
934 *curr = new;
935 }
936 }
937 });
938
939 let mut core = HydroNode::Reduce {
940 f: wrapped.into(),
941 input: Box::new(self.ir_node.into_inner()),
942 metadata: self.location.new_node_metadata::<T>(),
943 };
944
945 if L::is_top_level() {
946 core = HydroNode::Persist {
947 inner: Box::new(core),
948 metadata: self.location.new_node_metadata::<T>(),
949 };
950 }
951
952 Optional::new(self.location, core)
953 }
954
955 /// Computes the minimum element in the stream as an [`Optional`], which
956 /// will be empty until the first element in the input arrives.
957 ///
958 /// # Example
959 /// ```rust
960 /// # use hydro_lang::*;
961 /// # use futures::StreamExt;
962 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
963 /// let tick = process.tick();
964 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
965 /// let batch = unsafe { numbers.tick_batch(&tick) };
966 /// batch.min().all_ticks()
967 /// # }, |mut stream| async move {
968 /// // 1
969 /// # assert_eq!(stream.next().await.unwrap(), 1);
970 /// # }));
971 /// ```
972 pub fn min(self) -> Optional<T, L, B>
973 where
974 T: Ord,
975 {
976 self.reduce_commutative(q!(|curr, new| {
977 if new < *curr {
978 *curr = new;
979 }
980 }))
981 }
982
983 /// Computes the number of elements in the stream as a [`Singleton`].
984 ///
985 /// # Example
986 /// ```rust
987 /// # use hydro_lang::*;
988 /// # use futures::StreamExt;
989 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
990 /// let tick = process.tick();
991 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
992 /// let batch = unsafe { numbers.tick_batch(&tick) };
993 /// batch.count().all_ticks()
994 /// # }, |mut stream| async move {
995 /// // 4
996 /// # assert_eq!(stream.next().await.unwrap(), 4);
997 /// # }));
998 /// ```
999 pub fn count(self) -> Singleton<usize, L, B> {
1000 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1001 }
1002}
1003
1004impl<'a, T, L, B> Stream<T, L, B, TotalOrder>
1005where
1006 L: Location<'a>,
1007{
1008 /// Returns a stream with the current count tupled with each element in the input stream.
1009 ///
1010 /// # Example
1011 /// ```rust
1012 /// # use hydro_lang::*;
1013 /// # use futures::StreamExt;
1014 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
1015 /// let tick = process.tick();
1016 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1017 /// numbers.enumerate()
1018 /// # }, |mut stream| async move {
1019 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1020 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1021 /// # assert_eq!(stream.next().await.unwrap(), w);
1022 /// # }
1023 /// # }));
1024 /// ```
1025 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
1026 if L::is_top_level() {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::Persist {
1030 inner: Box::new(HydroNode::Enumerate {
1031 is_static: true,
1032 input: Box::new(HydroNode::Unpersist {
1033 inner: Box::new(self.ir_node.into_inner()),
1034 metadata: self.location.new_node_metadata::<T>(),
1035 }),
1036 metadata: self.location.new_node_metadata::<(usize, T)>(),
1037 }),
1038 metadata: self.location.new_node_metadata::<(usize, T)>(),
1039 },
1040 )
1041 } else {
1042 Stream::new(
1043 self.location.clone(),
1044 HydroNode::Enumerate {
1045 is_static: false,
1046 input: Box::new(self.ir_node.into_inner()),
1047 metadata: self.location.new_node_metadata::<(usize, T)>(),
1048 },
1049 )
1050 }
1051 }
1052
1053 /// Computes the first element in the stream as an [`Optional`], which
1054 /// will be empty until the first element in the input arrives.
1055 ///
1056 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1057 /// re-ordering of elements may cause the first element to change.
1058 ///
1059 /// # Example
1060 /// ```rust
1061 /// # use hydro_lang::*;
1062 /// # use futures::StreamExt;
1063 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1064 /// let tick = process.tick();
1065 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1066 /// let batch = unsafe { numbers.tick_batch(&tick) };
1067 /// batch.first().all_ticks()
1068 /// # }, |mut stream| async move {
1069 /// // 1
1070 /// # assert_eq!(stream.next().await.unwrap(), 1);
1071 /// # }));
1072 /// ```
1073 pub fn first(self) -> Optional<T, L, B> {
1074 Optional::new(self.location, self.ir_node.into_inner())
1075 }
1076
1077 /// Computes the last element in the stream as an [`Optional`], which
1078 /// will be empty until an element in the input arrives.
1079 ///
1080 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1081 /// re-ordering of elements may cause the last element to change.
1082 ///
1083 /// # Example
1084 /// ```rust
1085 /// # use hydro_lang::*;
1086 /// # use futures::StreamExt;
1087 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1088 /// let tick = process.tick();
1089 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1090 /// let batch = unsafe { numbers.tick_batch(&tick) };
1091 /// batch.last().all_ticks()
1092 /// # }, |mut stream| async move {
1093 /// // 4
1094 /// # assert_eq!(stream.next().await.unwrap(), 4);
1095 /// # }));
1096 /// ```
1097 pub fn last(self) -> Optional<T, L, B> {
1098 self.reduce(q!(|curr, new| *curr = new))
1099 }
1100
1101 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1102 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1103 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1104 ///
1105 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1106 /// to depend on the order of elements in the stream.
1107 ///
1108 /// # Example
1109 /// ```rust
1110 /// # use hydro_lang::*;
1111 /// # use futures::StreamExt;
1112 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1113 /// let tick = process.tick();
1114 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1115 /// let batch = unsafe { words.tick_batch(&tick) };
1116 /// batch
1117 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1118 /// .all_ticks()
1119 /// # }, |mut stream| async move {
1120 /// // "HELLOWORLD"
1121 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1122 /// # }));
1123 /// ```
1124 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1125 self,
1126 init: impl IntoQuotedMut<'a, I, L>,
1127 comb: impl IntoQuotedMut<'a, F, L>,
1128 ) -> Singleton<A, L, B> {
1129 let init = init.splice_fn0_ctx(&self.location).into();
1130 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1131
1132 let mut core = HydroNode::Fold {
1133 init,
1134 acc: comb,
1135 input: Box::new(self.ir_node.into_inner()),
1136 metadata: self.location.new_node_metadata::<A>(),
1137 };
1138
1139 if L::is_top_level() {
1140 // top-level (possibly unbounded) singletons are represented as
1141 // a stream which produces all values from all ticks every tick,
1142 // so Unpersist will always give the lastest aggregation
1143 core = HydroNode::Persist {
1144 inner: Box::new(core),
1145 metadata: self.location.new_node_metadata::<A>(),
1146 };
1147 }
1148
1149 Singleton::new(self.location, core)
1150 }
1151
1152 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1153 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1154 /// until the first element in the input arrives.
1155 ///
1156 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1157 /// to depend on the order of elements in the stream.
1158 ///
1159 /// # Example
1160 /// ```rust
1161 /// # use hydro_lang::*;
1162 /// # use futures::StreamExt;
1163 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1164 /// let tick = process.tick();
1165 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1166 /// let batch = unsafe { words.tick_batch(&tick) };
1167 /// batch
1168 /// .map(q!(|x| x.to_string()))
1169 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1170 /// .all_ticks()
1171 /// # }, |mut stream| async move {
1172 /// // "HELLOWORLD"
1173 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1174 /// # }));
1175 /// ```
1176 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1177 self,
1178 comb: impl IntoQuotedMut<'a, F, L>,
1179 ) -> Optional<T, L, B> {
1180 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1181 let mut core = HydroNode::Reduce {
1182 f,
1183 input: Box::new(self.ir_node.into_inner()),
1184 metadata: self.location.new_node_metadata::<T>(),
1185 };
1186
1187 if L::is_top_level() {
1188 core = HydroNode::Persist {
1189 inner: Box::new(core),
1190 metadata: self.location.new_node_metadata::<T>(),
1191 };
1192 }
1193
1194 Optional::new(self.location, core)
1195 }
1196}
1197
1198impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream<T, L, Unbounded, O> {
1199 /// Produces a new stream that interleaves the elements of the two input streams.
1200 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1201 ///
1202 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1203 /// [`Bounded`], you can use [`Stream::chain`] instead.
1204 ///
1205 /// # Example
1206 /// ```rust
1207 /// # use hydro_lang::*;
1208 /// # use futures::StreamExt;
1209 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1210 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1211 /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1212 /// # }, |mut stream| async move {
1213 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1214 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1215 /// # assert_eq!(stream.next().await.unwrap(), w);
1216 /// # }
1217 /// # }));
1218 /// ```
1219 pub fn union<O2>(self, other: Stream<T, L, Unbounded, O2>) -> Stream<T, L, Unbounded, NoOrder> {
1220 let tick = self.location.tick();
1221 unsafe {
1222 // SAFETY: Because the outputs are unordered,
1223 // we can interleave batches from both streams.
1224 self.tick_batch(&tick)
1225 .assume_ordering::<NoOrder>()
1226 .chain(other.tick_batch(&tick).assume_ordering::<NoOrder>())
1227 .all_ticks()
1228 .assume_ordering()
1229 }
1230 }
1231}
1232
1233impl<'a, T, L, O> Stream<T, L, Bounded, O>
1234where
1235 L: Location<'a>,
1236{
1237 /// Produces a new stream that emits the input elements in sorted order.
1238 ///
1239 /// The input stream can have any ordering guarantee, but the output stream
1240 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1241 /// elements in the input stream are available, so it requires the input stream
1242 /// to be [`Bounded`].
1243 ///
1244 /// # Example
1245 /// ```rust
1246 /// # use hydro_lang::*;
1247 /// # use futures::StreamExt;
1248 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1249 /// let tick = process.tick();
1250 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1251 /// let batch = unsafe { numbers.tick_batch(&tick) };
1252 /// batch.sort().all_ticks()
1253 /// # }, |mut stream| async move {
1254 /// // 1, 2, 3, 4
1255 /// # for w in (1..5) {
1256 /// # assert_eq!(stream.next().await.unwrap(), w);
1257 /// # }
1258 /// # }));
1259 /// ```
1260 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
1261 where
1262 T: Ord,
1263 {
1264 Stream::new(
1265 self.location.clone(),
1266 HydroNode::Sort {
1267 input: Box::new(self.ir_node.into_inner()),
1268 metadata: self.location.new_node_metadata::<T>(),
1269 },
1270 )
1271 }
1272
1273 /// Produces a new stream that first emits the elements of the `self` stream,
1274 /// and then emits the elements of the `other` stream. The output stream has
1275 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1276 /// [`TotalOrder`] guarantee.
1277 ///
1278 /// Currently, both input streams must be [`Bounded`]. This operator will block
1279 /// on the first stream until all its elements are available. In a future version,
1280 /// we will relax the requirement on the `other` stream.
1281 ///
1282 /// # Example
1283 /// ```rust
1284 /// # use hydro_lang::*;
1285 /// # use futures::StreamExt;
1286 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1287 /// let tick = process.tick();
1288 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1289 /// let batch = unsafe { numbers.tick_batch(&tick) };
1290 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1291 /// # }, |mut stream| async move {
1292 /// // 2, 3, 4, 5, 1, 2, 3, 4
1293 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1294 /// # assert_eq!(stream.next().await.unwrap(), w);
1295 /// # }
1296 /// # }));
1297 /// ```
1298 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, O::Min>
1299 where
1300 O: MinOrder<O2>,
1301 {
1302 check_matching_location(&self.location, &other.location);
1303
1304 Stream::new(
1305 self.location.clone(),
1306 HydroNode::Chain {
1307 first: Box::new(self.ir_node.into_inner()),
1308 second: Box::new(other.ir_node.into_inner()),
1309 metadata: self.location.new_node_metadata::<T>(),
1310 },
1311 )
1312 }
1313}
1314
1315impl<'a, K, V1, L, B, O> Stream<(K, V1), L, B, O>
1316where
1317 L: Location<'a>,
1318{
1319 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1320 /// by equi-joining the two streams on the key attribute `K`.
1321 ///
1322 /// # Example
1323 /// ```rust
1324 /// # use hydro_lang::*;
1325 /// # use std::collections::HashSet;
1326 /// # use futures::StreamExt;
1327 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1328 /// let tick = process.tick();
1329 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1330 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1331 /// stream1.join(stream2)
1332 /// # }, |mut stream| async move {
1333 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1334 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1335 /// # stream.map(|i| assert!(expected.contains(&i)));
1336 /// # }));
1337 pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
1338 where
1339 K: Eq + Hash,
1340 {
1341 check_matching_location(&self.location, &n.location);
1342
1343 Stream::new(
1344 self.location.clone(),
1345 HydroNode::Join {
1346 left: Box::new(self.ir_node.into_inner()),
1347 right: Box::new(n.ir_node.into_inner()),
1348 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1349 },
1350 )
1351 }
1352
1353 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1354 /// computes the anti-join of the items in the input -- i.e. returns
1355 /// unique items in the first input that do not have a matching key
1356 /// in the second input.
1357 ///
1358 /// # Example
1359 /// ```rust
1360 /// # use hydro_lang::*;
1361 /// # use futures::StreamExt;
1362 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1363 /// let tick = process.tick();
1364 /// let stream = unsafe {
1365 /// process
1366 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1367 /// .tick_batch(&tick)
1368 /// };
1369 /// let batch = unsafe {
1370 /// process
1371 /// .source_iter(q!(vec![1, 2]))
1372 /// .tick_batch(&tick)
1373 /// };
1374 /// stream.anti_join(batch).all_ticks()
1375 /// # }, |mut stream| async move {
1376 /// # for w in vec![(3, 'c'), (4, 'd')] {
1377 /// # assert_eq!(stream.next().await.unwrap(), w);
1378 /// # }
1379 /// # }));
1380 pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, O>
1381 where
1382 K: Eq + Hash,
1383 {
1384 check_matching_location(&self.location, &n.location);
1385
1386 Stream::new(
1387 self.location.clone(),
1388 HydroNode::AntiJoin {
1389 pos: Box::new(self.ir_node.into_inner()),
1390 neg: Box::new(n.ir_node.into_inner()),
1391 metadata: self.location.new_node_metadata::<(K, V1)>(),
1392 },
1393 )
1394 }
1395}
1396
1397impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded>
1398where
1399 K: Eq + Hash,
1400 L: Location<'a>,
1401{
1402 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1403 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1404 /// in the second element are accumulated via the `comb` closure.
1405 ///
1406 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1407 /// to depend on the order of elements in the stream.
1408 ///
1409 /// If the input and output value types are the same and do not require initialization then use
1410 /// [`Stream::reduce_keyed`].
1411 ///
1412 /// # Example
1413 /// ```rust
1414 /// # use hydro_lang::*;
1415 /// # use futures::StreamExt;
1416 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1417 /// let tick = process.tick();
1418 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1419 /// let batch = unsafe { numbers.tick_batch(&tick) };
1420 /// batch
1421 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1422 /// .all_ticks()
1423 /// # }, |mut stream| async move {
1424 /// // (1, 5), (2, 7)
1425 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1426 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1427 /// # }));
1428 /// ```
1429 pub fn fold_keyed<A, I, F>(
1430 self,
1431 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1432 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1433 ) -> Stream<(K, A), Tick<L>, Bounded>
1434 where
1435 I: Fn() -> A + 'a,
1436 F: Fn(&mut A, V) + 'a,
1437 {
1438 let init = init.splice_fn0_ctx(&self.location).into();
1439 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1440
1441 Stream::new(
1442 self.location.clone(),
1443 HydroNode::FoldKeyed {
1444 init,
1445 acc: comb,
1446 input: Box::new(self.ir_node.into_inner()),
1447 metadata: self.location.new_node_metadata::<(K, A)>(),
1448 },
1449 )
1450 }
1451
1452 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1453 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1454 /// in the second element are accumulated via the `comb` closure.
1455 ///
1456 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1457 /// to depend on the order of elements in the stream.
1458 ///
1459 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1460 ///
1461 /// # Example
1462 /// ```rust
1463 /// # use hydro_lang::*;
1464 /// # use futures::StreamExt;
1465 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1466 /// let tick = process.tick();
1467 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1468 /// let batch = unsafe { numbers.tick_batch(&tick) };
1469 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1470 /// # }, |mut stream| async move {
1471 /// // (1, 5), (2, 7)
1472 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1473 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1474 /// # }));
1475 /// ```
1476 pub fn reduce_keyed<F>(
1477 self,
1478 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1479 ) -> Stream<(K, V), Tick<L>, Bounded>
1480 where
1481 F: Fn(&mut V, V) + 'a,
1482 {
1483 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1484
1485 Stream::new(
1486 self.location.clone(),
1487 HydroNode::ReduceKeyed {
1488 f,
1489 input: Box::new(self.ir_node.into_inner()),
1490 metadata: self.location.new_node_metadata::<(K, V)>(),
1491 },
1492 )
1493 }
1494}
1495
1496impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O>
1497where
1498 K: Eq + Hash,
1499 L: Location<'a>,
1500{
1501 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1502 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1503 /// in the second element are accumulated via the `comb` closure.
1504 ///
1505 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1506 ///
1507 /// If the input and output value types are the same and do not require initialization then use
1508 /// [`Stream::reduce_keyed_commutative`].
1509 ///
1510 /// # Example
1511 /// ```rust
1512 /// # use hydro_lang::*;
1513 /// # use futures::StreamExt;
1514 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1515 /// let tick = process.tick();
1516 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1517 /// let batch = unsafe { numbers.tick_batch(&tick) };
1518 /// batch
1519 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1520 /// .all_ticks()
1521 /// # }, |mut stream| async move {
1522 /// // (1, 5), (2, 7)
1523 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1524 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1525 /// # }));
1526 /// ```
1527 pub fn fold_keyed_commutative<A, I, F>(
1528 self,
1529 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1530 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1531 ) -> Stream<(K, A), Tick<L>, Bounded, O>
1532 where
1533 I: Fn() -> A + 'a,
1534 F: Fn(&mut A, V) + 'a,
1535 {
1536 let init = init.splice_fn0_ctx(&self.location).into();
1537 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1538
1539 Stream::new(
1540 self.location.clone(),
1541 HydroNode::FoldKeyed {
1542 init,
1543 acc: comb,
1544 input: Box::new(self.ir_node.into_inner()),
1545 metadata: self.location.new_node_metadata::<(K, A)>(),
1546 },
1547 )
1548 }
1549
1550 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1551 /// # Example
1552 /// ```rust
1553 /// # use hydro_lang::*;
1554 /// # use futures::StreamExt;
1555 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1556 /// let tick = process.tick();
1557 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1558 /// let batch = unsafe { numbers.tick_batch(&tick) };
1559 /// batch.keys().all_ticks()
1560 /// # }, |mut stream| async move {
1561 /// // 1, 2
1562 /// # assert_eq!(stream.next().await.unwrap(), 1);
1563 /// # assert_eq!(stream.next().await.unwrap(), 2);
1564 /// # }));
1565 /// ```
1566 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, O> {
1567 self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1568 .map(q!(|(k, _)| k))
1569 }
1570
1571 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1572 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1573 /// in the second element are accumulated via the `comb` closure.
1574 ///
1575 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1576 ///
1577 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1578 ///
1579 /// # Example
1580 /// ```rust
1581 /// # use hydro_lang::*;
1582 /// # use futures::StreamExt;
1583 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1584 /// let tick = process.tick();
1585 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1586 /// let batch = unsafe { numbers.tick_batch(&tick) };
1587 /// batch
1588 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1589 /// .all_ticks()
1590 /// # }, |mut stream| async move {
1591 /// // (1, 5), (2, 7)
1592 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1593 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1594 /// # }));
1595 /// ```
1596 pub fn reduce_keyed_commutative<F>(
1597 self,
1598 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1599 ) -> Stream<(K, V), Tick<L>, Bounded, O>
1600 where
1601 F: Fn(&mut V, V) + 'a,
1602 {
1603 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1604
1605 Stream::new(
1606 self.location.clone(),
1607 HydroNode::ReduceKeyed {
1608 f,
1609 input: Box::new(self.ir_node.into_inner()),
1610 metadata: self.location.new_node_metadata::<(K, V)>(),
1611 },
1612 )
1613 }
1614}
1615
1616impl<'a, T, L, B, O> Stream<T, Atomic<L>, B, O>
1617where
1618 L: Location<'a> + NoTick,
1619{
1620 /// Returns a stream corresponding to the latest batch of elements being atomically
1621 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1622 /// the order of the input.
1623 ///
1624 /// # Safety
1625 /// The batch boundaries are non-deterministic and may change across executions.
1626 pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, O> {
1627 Stream::new(
1628 self.location.clone().tick,
1629 HydroNode::Unpersist {
1630 inner: Box::new(self.ir_node.into_inner()),
1631 metadata: self.location.new_node_metadata::<T>(),
1632 },
1633 )
1634 }
1635
1636 pub fn end_atomic(self) -> Stream<T, L, B, O> {
1637 Stream::new(self.location.tick.l, self.ir_node.into_inner())
1638 }
1639
1640 pub fn atomic_source(&self) -> Tick<L> {
1641 self.location.tick.clone()
1642 }
1643}
1644
1645impl<'a, T, L, B, O> Stream<T, L, B, O>
1646where
1647 L: Location<'a> + NoTick + NoAtomic,
1648{
1649 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O> {
1650 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1651 }
1652
1653 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1654 /// Future outputs are produced as available, regardless of input arrival order.
1655 ///
1656 /// # Example
1657 /// ```rust
1658 /// # use std::collections::HashSet;
1659 /// # use futures::StreamExt;
1660 /// # use hydro_lang::*;
1661 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1662 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1663 /// .map(q!(|x| async move {
1664 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1665 /// x
1666 /// }))
1667 /// .resolve_futures()
1668 /// # },
1669 /// # |mut stream| async move {
1670 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
1671 /// # let mut output = HashSet::new();
1672 /// # for _ in 1..10 {
1673 /// # output.insert(stream.next().await.unwrap());
1674 /// # }
1675 /// # assert_eq!(
1676 /// # output,
1677 /// # HashSet::<i32>::from_iter(1..10)
1678 /// # );
1679 /// # },
1680 /// # ));
1681 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder>
1682 where
1683 T: Future<Output = T2>,
1684 {
1685 Stream::new(
1686 self.location.clone(),
1687 HydroNode::ResolveFutures {
1688 input: Box::new(self.ir_node.into_inner()),
1689 metadata: self.location.new_node_metadata::<T2>(),
1690 },
1691 )
1692 }
1693
1694 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1695 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1696 /// the order of the input.
1697 ///
1698 /// # Safety
1699 /// The batch boundaries are non-deterministic and may change across executions.
1700 pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, O> {
1701 unsafe { self.atomic(tick).tick_batch() }
1702 }
1703
1704 /// Given a time interval, returns a stream corresponding to samples taken from the
1705 /// stream roughly at that interval. The output will have elements in the same order
1706 /// as the input, but with arbitrary elements skipped between samples. There is also
1707 /// no guarantee on the exact timing of the samples.
1708 ///
1709 /// # Safety
1710 /// The output stream is non-deterministic in which elements are sampled, since this
1711 /// is controlled by a clock.
1712 pub unsafe fn sample_every(
1713 self,
1714 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1715 ) -> Stream<T, L, Unbounded, O> {
1716 let samples = unsafe {
1717 // SAFETY: source of intentional non-determinism
1718 self.location.source_interval(interval)
1719 };
1720
1721 let tick = self.location.tick();
1722 unsafe {
1723 // SAFETY: source of intentional non-determinism
1724 self.tick_batch(&tick)
1725 .continue_if(samples.tick_batch(&tick).first())
1726 .all_ticks()
1727 }
1728 }
1729
1730 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1731 /// stream has not emitted a value since that duration.
1732 ///
1733 /// # Safety
1734 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1735 /// samples take place, timeouts may be non-deterministically generated or missed,
1736 /// and the notification of the timeout may be delayed as well. There is also no
1737 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1738 /// detected based on when the next sample is taken.
1739 pub unsafe fn timeout(
1740 self,
1741 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1742 ) -> Optional<(), L, Unbounded>
1743 where
1744 O: MinOrder<NoOrder, Min = NoOrder>,
1745 {
1746 let tick = self.location.tick();
1747
1748 let latest_received = self.fold_commutative(
1749 q!(|| None),
1750 q!(|latest, _| {
1751 *latest = Some(Instant::now());
1752 }),
1753 );
1754
1755 unsafe {
1756 // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1757 latest_received.latest_tick(&tick)
1758 }
1759 .filter_map(q!(move |latest_received| {
1760 if let Some(latest_received) = latest_received {
1761 if Instant::now().duration_since(latest_received) > duration {
1762 Some(())
1763 } else {
1764 None
1765 }
1766 } else {
1767 Some(())
1768 }
1769 }))
1770 .latest()
1771 }
1772}
1773
1774impl<'a, F, T, L, B, O> Stream<F, L, B, O>
1775where
1776 L: Location<'a> + NoTick + NoAtomic,
1777 F: Future<Output = T>,
1778{
1779 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1780 /// Future outputs are produced in the same order as the input stream.
1781 ///
1782 /// # Example
1783 /// ```rust
1784 /// # use std::collections::HashSet;
1785 /// # use futures::StreamExt;
1786 /// # use hydro_lang::*;
1787 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1788 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1789 /// .map(q!(|x| async move {
1790 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1791 /// x
1792 /// }))
1793 /// .resolve_futures_ordered()
1794 /// # },
1795 /// # |mut stream| async move {
1796 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
1797 /// # let mut output = Vec::new();
1798 /// # for _ in 1..10 {
1799 /// # output.push(stream.next().await.unwrap());
1800 /// # }
1801 /// # assert_eq!(
1802 /// # output,
1803 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
1804 /// # );
1805 /// # },
1806 /// # ));
1807 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O> {
1808 Stream::new(
1809 self.location.clone(),
1810 HydroNode::ResolveFuturesOrdered {
1811 input: Box::new(self.ir_node.into_inner()),
1812 metadata: self.location.new_node_metadata::<T>(),
1813 },
1814 )
1815 }
1816}
1817
1818impl<'a, T, L, B, O> Stream<T, L, B, O>
1819where
1820 L: Location<'a> + NoTick,
1821{
1822 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1823 let f = f.splice_fn1_ctx(&self.location).into();
1824 let metadata = self.location.new_node_metadata::<T>();
1825 self.location
1826 .flow_state()
1827 .borrow_mut()
1828 .leaves
1829 .as_mut()
1830 .expect(FLOW_USED_MESSAGE)
1831 .push(HydroLeaf::ForEach {
1832 input: Box::new(HydroNode::Unpersist {
1833 inner: Box::new(self.ir_node.into_inner()),
1834 metadata: metadata.clone(),
1835 }),
1836 f,
1837 metadata,
1838 });
1839 }
1840
1841 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1842 where
1843 S: 'a + futures::Sink<T> + Unpin,
1844 {
1845 self.location
1846 .flow_state()
1847 .borrow_mut()
1848 .leaves
1849 .as_mut()
1850 .expect(FLOW_USED_MESSAGE)
1851 .push(HydroLeaf::DestSink {
1852 sink: sink.splice_typed_ctx(&self.location).into(),
1853 input: Box::new(self.ir_node.into_inner()),
1854 metadata: self.location.new_node_metadata::<T>(),
1855 });
1856 }
1857}
1858
1859impl<'a, T, L, O> Stream<T, Tick<L>, Bounded, O>
1860where
1861 L: Location<'a>,
1862{
1863 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O> {
1864 Stream::new(
1865 self.location.outer().clone(),
1866 HydroNode::Persist {
1867 inner: Box::new(self.ir_node.into_inner()),
1868 metadata: self.location.new_node_metadata::<T>(),
1869 },
1870 )
1871 }
1872
1873 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O> {
1874 Stream::new(
1875 Atomic {
1876 tick: self.location.clone(),
1877 },
1878 HydroNode::Persist {
1879 inner: Box::new(self.ir_node.into_inner()),
1880 metadata: self.location.new_node_metadata::<T>(),
1881 },
1882 )
1883 }
1884
1885 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O>
1886 where
1887 T: Clone,
1888 {
1889 Stream::new(
1890 self.location.clone(),
1891 HydroNode::Persist {
1892 inner: Box::new(self.ir_node.into_inner()),
1893 metadata: self.location.new_node_metadata::<T>(),
1894 },
1895 )
1896 }
1897
1898 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O> {
1899 Stream::new(
1900 self.location.clone(),
1901 HydroNode::DeferTick {
1902 input: Box::new(self.ir_node.into_inner()),
1903 metadata: self.location.new_node_metadata::<T>(),
1904 },
1905 )
1906 }
1907
1908 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O> {
1909 Stream::new(
1910 self.location.clone(),
1911 HydroNode::Delta {
1912 inner: Box::new(self.ir_node.into_inner()),
1913 metadata: self.location.new_node_metadata::<T>(),
1914 },
1915 )
1916 }
1917}
1918
1919pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
1920 let root = get_this_crate();
1921
1922 if is_demux {
1923 parse_quote! {
1924 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
1925 |(id, data)| {
1926 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
1927 }
1928 )
1929 }
1930 } else {
1931 parse_quote! {
1932 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
1933 |data| {
1934 #root::runtime_support::bincode::serialize(&data).unwrap().into()
1935 }
1936 )
1937 }
1938 }
1939}
1940
1941fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
1942 serialize_bincode_with_type(is_demux, &stageleft::quote_type::<T>())
1943}
1944
1945pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
1946 let root = get_this_crate();
1947
1948 if let Some(c_type) = tagged {
1949 parse_quote! {
1950 |res| {
1951 let (id, b) = res.unwrap();
1952 (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
1953 }
1954 }
1955 } else {
1956 parse_quote! {
1957 |res| {
1958 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
1959 }
1960 }
1961 }
1962}
1963
1964pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
1965 deserialize_bincode_with_type(tagged, &stageleft::quote_type::<T>())
1966}
1967
1968impl<'a, T, L, B, O> Stream<T, L, B, O>
1969where
1970 L: Location<'a> + NoTick,
1971{
1972 pub fn send_bincode<L2, CoreType>(
1973 self,
1974 other: &L2,
1975 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, O::Min>
1976 where
1977 L::Root: CanSend<'a, L2, In<CoreType> = T>,
1978 L2: Location<'a>,
1979 CoreType: Serialize + DeserializeOwned,
1980 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
1981 {
1982 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
1983
1984 let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(
1985 L::Root::tagged_type().as_ref(),
1986 ));
1987
1988 Stream::new(
1989 other.clone(),
1990 HydroNode::Network {
1991 from_key: None,
1992 to_location: other.id(),
1993 to_key: None,
1994 serialize_fn: serialize_pipeline.map(|e| e.into()),
1995 instantiate_fn: DebugInstantiate::Building,
1996 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1997 input: Box::new(self.ir_node.into_inner()),
1998 metadata: other.new_node_metadata::<CoreType>(),
1999 },
2000 )
2001 }
2002
2003 pub fn send_bincode_external<L2, CoreType>(
2004 self,
2005 other: &ExternalProcess<L2>,
2006 ) -> ExternalBincodeStream<L::Out<CoreType>>
2007 where
2008 L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
2009 L2: 'a,
2010 CoreType: Serialize + DeserializeOwned,
2011 // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
2012 {
2013 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
2014
2015 let metadata = other.new_node_metadata::<CoreType>();
2016
2017 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2018
2019 let external_key = flow_state_borrow.next_external_out;
2020 flow_state_borrow.next_external_out += 1;
2021
2022 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2023
2024 let dummy_f: syn::Expr = syn::parse_quote!(());
2025
2026 leaves.push(HydroLeaf::ForEach {
2027 f: dummy_f.into(),
2028 input: Box::new(HydroNode::Network {
2029 from_key: None,
2030 to_location: other.id(),
2031 to_key: Some(external_key),
2032 serialize_fn: serialize_pipeline.map(|e| e.into()),
2033 instantiate_fn: DebugInstantiate::Building,
2034 deserialize_fn: None,
2035 input: Box::new(self.ir_node.into_inner()),
2036 metadata: metadata.clone(),
2037 }),
2038 metadata,
2039 });
2040
2041 ExternalBincodeStream {
2042 process_id: other.id,
2043 port_id: external_key,
2044 _phantom: PhantomData,
2045 }
2046 }
2047
2048 pub fn send_bytes<L2>(
2049 self,
2050 other: &L2,
2051 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, O::Min>
2052 where
2053 L2: Location<'a>,
2054 L::Root: CanSend<'a, L2, In<Bytes> = T>,
2055 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2056 {
2057 let root = get_this_crate();
2058 Stream::new(
2059 other.clone(),
2060 HydroNode::Network {
2061 from_key: None,
2062 to_location: other.id(),
2063 to_key: None,
2064 serialize_fn: None,
2065 instantiate_fn: DebugInstantiate::Building,
2066 deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
2067 let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
2068 Some(expr.into())
2069 } else {
2070 let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
2071 Some(expr.into())
2072 },
2073 input: Box::new(self.ir_node.into_inner()),
2074 metadata: other.new_node_metadata::<Bytes>(),
2075 },
2076 )
2077 }
2078
2079 pub fn send_bytes_external<L2>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
2080 where
2081 L2: 'a,
2082 L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
2083 {
2084 let metadata = other.new_node_metadata::<Bytes>();
2085
2086 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2087 let external_key = flow_state_borrow.next_external_out;
2088 flow_state_borrow.next_external_out += 1;
2089
2090 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2091
2092 let dummy_f: syn::Expr = syn::parse_quote!(());
2093
2094 leaves.push(HydroLeaf::ForEach {
2095 f: dummy_f.into(),
2096 input: Box::new(HydroNode::Network {
2097 from_key: None,
2098 to_location: other.id(),
2099 to_key: Some(external_key),
2100 serialize_fn: None,
2101 instantiate_fn: DebugInstantiate::Building,
2102 deserialize_fn: None,
2103 input: Box::new(self.ir_node.into_inner()),
2104 metadata: metadata.clone(),
2105 }),
2106 metadata,
2107 });
2108
2109 ExternalBytesPort {
2110 process_id: other.id,
2111 port_id: external_key,
2112 }
2113 }
2114
2115 pub fn send_bincode_anonymous<L2, Tag, CoreType>(
2116 self,
2117 other: &L2,
2118 ) -> Stream<CoreType, L2, Unbounded, O::Min>
2119 where
2120 L2: Location<'a>,
2121 L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
2122 CoreType: Serialize + DeserializeOwned,
2123 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2124 {
2125 self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
2126 }
2127
2128 pub fn send_bytes_anonymous<L2, Tag>(self, other: &L2) -> Stream<Bytes, L2, Unbounded, O::Min>
2129 where
2130 L2: Location<'a>,
2131 L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
2132 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2133 {
2134 self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
2135 }
2136
2137 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2138 pub fn broadcast_bincode<C2>(
2139 self,
2140 other: &Cluster<'a, C2>,
2141 ) -> Stream<<L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>, Cluster<'a, C2>, Unbounded, O::Min>
2142 where
2143 C2: 'a,
2144 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2145 T: Clone + Serialize + DeserializeOwned,
2146 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2147 {
2148 let ids = other.members();
2149
2150 let to_send: Stream<(u32, Bytes), L, B, O> = self
2151 .map::<Bytes, _>(q!(|v| bincode::serialize(&v).unwrap().into()))
2152 .flat_map_ordered(q!(|v| { ids.iter().map(move |id| (id.raw_id, v.clone())) }));
2153
2154 let deserialize_pipeline = Some(deserialize_bincode::<T>(L::Root::tagged_type().as_ref()));
2155
2156 Stream::new(
2157 other.clone(),
2158 HydroNode::Network {
2159 from_key: None,
2160 to_location: other.id(),
2161 to_key: None,
2162 serialize_fn: None,
2163 instantiate_fn: DebugInstantiate::Building,
2164 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2165 input: Box::new(to_send.ir_node.into_inner()),
2166 metadata: other.new_node_metadata::<T>(),
2167 },
2168 )
2169 }
2170
2171 pub fn broadcast_bincode_anonymous<C2, Tag>(
2172 self,
2173 other: &Cluster<'a, C2>,
2174 ) -> Stream<T, Cluster<'a, C2>, Unbounded, O::Min>
2175 where
2176 C2: 'a,
2177 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
2178 T: Clone + Serialize + DeserializeOwned,
2179 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2180 {
2181 self.broadcast_bincode(other).map(q!(|(_, b)| b))
2182 }
2183
2184 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2185 pub fn broadcast_bytes<C2>(
2186 self,
2187 other: &Cluster<'a, C2>,
2188 ) -> Stream<
2189 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2190 Cluster<'a, C2>,
2191 Unbounded,
2192 O::Min,
2193 >
2194 where
2195 C2: 'a,
2196 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)>,
2197 T: Clone,
2198 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2199 {
2200 let ids = other.members();
2201
2202 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2203 ::std::clone::Clone::clone(id),
2204 ::std::clone::Clone::clone(&b)
2205 ))))
2206 .send_bytes(other)
2207 }
2208
2209 pub fn broadcast_bytes_anonymous<C2, Tag>(
2210 self,
2211 other: &Cluster<'a, C2>,
2212 ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, O::Min>
2213 where
2214 C2: 'a,
2215 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2216 + 'a,
2217 T: Clone,
2218 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2219 {
2220 self.broadcast_bytes(other).map(q!(|(_, b)| b))
2221 }
2222}
2223
2224#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2225impl<'a, T, L, B> Stream<T, L, B, TotalOrder>
2226where
2227 L: Location<'a> + NoTick,
2228{
2229 pub fn round_robin_bincode<C2>(
2230 self,
2231 other: &Cluster<'a, C2>,
2232 ) -> Stream<
2233 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2234 Cluster<'a, C2>,
2235 Unbounded,
2236 <TotalOrder as MinOrder<
2237 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2238 >>::Min,
2239 >
2240 where
2241 C2: 'a,
2242 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2243 T: Clone + Serialize + DeserializeOwned,
2244 TotalOrder:
2245 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2246 {
2247 let ids = other.members();
2248
2249 self.enumerate()
2250 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2251 .send_bincode(other)
2252 }
2253
2254 pub fn round_robin_bincode_anonymous<C2, Tag>(
2255 self,
2256 other: &Cluster<'a, C2>,
2257 ) -> Stream<
2258 T,
2259 Cluster<'a, C2>,
2260 Unbounded,
2261 <TotalOrder as MinOrder<
2262 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2263 >>::Min,
2264 >
2265 where
2266 C2: 'a,
2267 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2268 T: Clone + Serialize + DeserializeOwned,
2269 TotalOrder:
2270 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2271 {
2272 self.round_robin_bincode(other).map(q!(|(_, b)| b))
2273 }
2274
2275 pub fn round_robin_bytes<C2>(
2276 self,
2277 other: &Cluster<'a, C2>,
2278 ) -> Stream<
2279 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2280 Cluster<'a, C2>,
2281 Unbounded,
2282 <TotalOrder as MinOrder<
2283 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2284 >>::Min,
2285 >
2286 where
2287 C2: 'a,
2288 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2289 T: Clone,
2290 TotalOrder:
2291 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2292 {
2293 let ids = other.members();
2294
2295 self.enumerate()
2296 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2297 .send_bytes(other)
2298 }
2299
2300 pub fn round_robin_bytes_anonymous<C2, Tag>(
2301 self,
2302 other: &Cluster<'a, C2>,
2303 ) -> Stream<
2304 Bytes,
2305 Cluster<'a, C2>,
2306 Unbounded,
2307 <TotalOrder as MinOrder<
2308 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2309 >>::Min,
2310 >
2311 where
2312 C2: 'a,
2313 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2314 + 'a,
2315 T: Clone,
2316 TotalOrder:
2317 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2318 {
2319 self.round_robin_bytes(other).map(q!(|(_, b)| b))
2320 }
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325 use futures::StreamExt;
2326 use hydro_deploy::Deployment;
2327 use serde::{Deserialize, Serialize};
2328 use stageleft::q;
2329
2330 use crate::FlowBuilder;
2331 use crate::location::Location;
2332
2333 struct P1 {}
2334 struct P2 {}
2335
2336 #[derive(Serialize, Deserialize, Debug)]
2337 struct SendOverNetwork {
2338 n: u32,
2339 }
2340
2341 #[tokio::test]
2342 async fn first_ten_distributed() {
2343 let mut deployment = Deployment::new();
2344
2345 let flow = FlowBuilder::new();
2346 let first_node = flow.process::<P1>();
2347 let second_node = flow.process::<P2>();
2348 let external = flow.external_process::<P2>();
2349
2350 let numbers = first_node.source_iter(q!(0..10));
2351 let out_port = numbers
2352 .map(q!(|n| SendOverNetwork { n }))
2353 .send_bincode(&second_node)
2354 .send_bincode_external(&external);
2355
2356 let nodes = flow
2357 .with_process(&first_node, deployment.Localhost())
2358 .with_process(&second_node, deployment.Localhost())
2359 .with_external(&external, deployment.Localhost())
2360 .deploy(&mut deployment);
2361
2362 deployment.deploy().await.unwrap();
2363
2364 let mut external_out = nodes.connect_source_bincode(out_port).await;
2365
2366 deployment.start().await.unwrap();
2367
2368 for i in 0..10 {
2369 assert_eq!(external_out.next().await.unwrap().n, i);
2370 }
2371 }
2372}