hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21 CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub enum TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub enum NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41 /// The weaker of the two orderings.
42 type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47 type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52 type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57 type Min = NoOrder;
58}
59
60/// Marks the stream as having deterministic message cardinality, with no
61/// possibility of duplicates.
62pub enum ExactlyOnce {}
63
64/// Marks the stream as having non-deterministic message cardinality, which
65/// means that duplicates may occur, but messages will not be dropped.
66pub enum AtLeastOnce {}
67
68/// Helper trait for determining the weakest of two retry guarantees.
69#[sealed::sealed]
70pub trait MinRetries<Other> {
71 /// The weaker of the two retry guarantees.
72 type Min;
73}
74
75#[sealed::sealed]
76impl<T> MinRetries<T> for T {
77 type Min = T;
78}
79
80#[sealed::sealed]
81impl MinRetries<ExactlyOnce> for AtLeastOnce {
82 type Min = AtLeastOnce;
83}
84
85#[sealed::sealed]
86impl MinRetries<AtLeastOnce> for ExactlyOnce {
87 type Min = ExactlyOnce;
88}
89
90/// An ordered sequence stream of elements of type `T`.
91///
92/// Type Parameters:
93/// - `Type`: the type of elements in the stream
94/// - `Loc`: the location where the stream is being materialized
95/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
96/// or [`Unbounded`]
97/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
98/// or [`NoOrder`] (default is [`TotalOrder`])
99pub struct Stream<Type, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
100 location: Loc,
101 pub(crate) ir_node: RefCell<HydroNode>,
102
103 _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
104}
105
106impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
107where
108 L: Location<'a>,
109{
110 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
111 Stream {
112 location: stream.location,
113 ir_node: stream.ir_node,
114 _phantom: PhantomData,
115 }
116 }
117}
118
119impl<'a, T, L, B, R> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>
120where
121 L: Location<'a>,
122{
123 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
124 Stream {
125 location: stream.location,
126 ir_node: stream.ir_node,
127 _phantom: PhantomData,
128 }
129 }
130}
131
132impl<'a, T, L, B, O> From<Stream<T, L, B, O, ExactlyOnce>> for Stream<T, L, B, O, AtLeastOnce>
133where
134 L: Location<'a>,
135{
136 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
137 Stream {
138 location: stream.location,
139 ir_node: stream.ir_node,
140 _phantom: PhantomData,
141 }
142 }
143}
144
145impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
146where
147 L: Location<'a>,
148{
149 fn location_kind(&self) -> LocationId {
150 self.location.id()
151 }
152}
153
154impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
155where
156 L: Location<'a>,
157{
158 fn defer_tick(self) -> Self {
159 Stream::defer_tick(self)
160 }
161}
162
163impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
164where
165 L: Location<'a>,
166{
167 type Location = Tick<L>;
168
169 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
170 let location_id = location.id();
171 Stream::new(
172 location.clone(),
173 HydroNode::CycleSource {
174 ident,
175 location_kind: location_id,
176 metadata: location.new_node_metadata::<T>(),
177 },
178 )
179 }
180}
181
182impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
183where
184 L: Location<'a>,
185{
186 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
187 assert_eq!(
188 self.location.id(),
189 expected_location,
190 "locations do not match"
191 );
192 self.location
193 .flow_state()
194 .borrow_mut()
195 .leaves
196 .as_mut()
197 .expect(FLOW_USED_MESSAGE)
198 .push(HydroLeaf::CycleSink {
199 ident,
200 location_kind: self.location_kind(),
201 input: Box::new(self.ir_node.into_inner()),
202 metadata: self.location.new_node_metadata::<T>(),
203 });
204 }
205}
206
207impl<'a, T, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
208where
209 L: Location<'a> + NoTick,
210{
211 type Location = L;
212
213 fn create_source(ident: syn::Ident, location: L) -> Self {
214 let location_id = location.id();
215
216 Stream::new(
217 location.clone(),
218 HydroNode::Persist {
219 inner: Box::new(HydroNode::CycleSource {
220 ident,
221 location_kind: location_id,
222 metadata: location.new_node_metadata::<T>(),
223 }),
224 metadata: location.new_node_metadata::<T>(),
225 },
226 )
227 }
228}
229
230impl<'a, T, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
231where
232 L: Location<'a> + NoTick,
233{
234 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
235 assert_eq!(
236 self.location.id(),
237 expected_location,
238 "locations do not match"
239 );
240 let metadata = self.location.new_node_metadata::<T>();
241 self.location
242 .flow_state()
243 .borrow_mut()
244 .leaves
245 .as_mut()
246 .expect(FLOW_USED_MESSAGE)
247 .push(HydroLeaf::CycleSink {
248 ident,
249 location_kind: self.location_kind(),
250 input: Box::new(HydroNode::Unpersist {
251 inner: Box::new(self.ir_node.into_inner()),
252 metadata: metadata.clone(),
253 }),
254 metadata,
255 });
256 }
257}
258
259impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
260where
261 L: Location<'a>,
262{
263 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
264 Stream {
265 location,
266 ir_node: RefCell::new(ir_node),
267 _phantom: PhantomData,
268 }
269 }
270}
271
272impl<'a, T, L, B, O, R> Clone for Stream<T, L, B, O, R>
273where
274 T: Clone,
275 L: Location<'a>,
276{
277 fn clone(&self) -> Self {
278 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280 *self.ir_node.borrow_mut() = HydroNode::Tee {
281 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
282 metadata: self.location.new_node_metadata::<T>(),
283 };
284 }
285
286 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287 Stream {
288 location: self.location.clone(),
289 ir_node: HydroNode::Tee {
290 inner: TeeNode(inner.0.clone()),
291 metadata: metadata.clone(),
292 }
293 .into(),
294 _phantom: PhantomData,
295 }
296 } else {
297 unreachable!()
298 }
299 }
300}
301
302impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
303where
304 L: Location<'a>,
305{
306 /// Produces a stream based on invoking `f` on each element in order.
307 /// If you do not want to modify the stream and instead only want to view
308 /// each item use [`Stream::inspect`] instead.
309 ///
310 /// # Example
311 /// ```rust
312 /// # use hydro_lang::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
315 /// let words = process.source_iter(q!(vec!["hello", "world"]));
316 /// words.map(q!(|x| x.to_uppercase()))
317 /// # }, |mut stream| async move {
318 /// # for w in vec!["HELLO", "WORLD"] {
319 /// # assert_eq!(stream.next().await.unwrap(), w);
320 /// # }
321 /// # }));
322 /// ```
323 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
324 where
325 F: Fn(T) -> U + 'a,
326 {
327 let f = f.splice_fn1_ctx(&self.location).into();
328 Stream::new(
329 self.location.clone(),
330 HydroNode::Map {
331 f,
332 input: Box::new(self.ir_node.into_inner()),
333 metadata: self.location.new_node_metadata::<U>(),
334 },
335 )
336 }
337
338 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
339 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
340 /// for the output type `U` must produce items in a **deterministic** order.
341 ///
342 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
343 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
344 ///
345 /// # Example
346 /// ```rust
347 /// # use hydro_lang::*;
348 /// # use futures::StreamExt;
349 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
350 /// process
351 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
352 /// .flat_map_ordered(q!(|x| x))
353 /// # }, |mut stream| async move {
354 /// // 1, 2, 3, 4
355 /// # for w in (1..5) {
356 /// # assert_eq!(stream.next().await.unwrap(), w);
357 /// # }
358 /// # }));
359 /// ```
360 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
361 where
362 I: IntoIterator<Item = U>,
363 F: Fn(T) -> I + 'a,
364 {
365 let f = f.splice_fn1_ctx(&self.location).into();
366 Stream::new(
367 self.location.clone(),
368 HydroNode::FlatMap {
369 f,
370 input: Box::new(self.ir_node.into_inner()),
371 metadata: self.location.new_node_metadata::<U>(),
372 },
373 )
374 }
375
376 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
377 /// for the output type `U` to produce items in any order.
378 ///
379 /// # Example
380 /// ```rust
381 /// # use hydro_lang::{*, stream::ExactlyOnce};
382 /// # use futures::StreamExt;
383 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
384 /// process
385 /// .source_iter(q!(vec![
386 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
387 /// std::collections::HashSet::from_iter(vec![3, 4]),
388 /// ]))
389 /// .flat_map_unordered(q!(|x| x))
390 /// # }, |mut stream| async move {
391 /// // 1, 2, 3, 4, but in no particular order
392 /// # let mut results = Vec::new();
393 /// # for w in (1..5) {
394 /// # results.push(stream.next().await.unwrap());
395 /// # }
396 /// # results.sort();
397 /// # assert_eq!(results, vec![1, 2, 3, 4]);
398 /// # }));
399 /// ```
400 pub fn flat_map_unordered<U, I, F>(
401 self,
402 f: impl IntoQuotedMut<'a, F, L>,
403 ) -> Stream<U, L, B, NoOrder, R>
404 where
405 I: IntoIterator<Item = U>,
406 F: Fn(T) -> I + 'a,
407 {
408 let f = f.splice_fn1_ctx(&self.location).into();
409 Stream::new(
410 self.location.clone(),
411 HydroNode::FlatMap {
412 f,
413 input: Box::new(self.ir_node.into_inner()),
414 metadata: self.location.new_node_metadata::<U>(),
415 },
416 )
417 }
418
419 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
420 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
421 ///
422 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
423 /// not deterministic, use [`Stream::flatten_unordered`] instead.
424 ///
425 /// ```rust
426 /// # use hydro_lang::*;
427 /// # use futures::StreamExt;
428 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
429 /// process
430 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
431 /// .flatten_ordered()
432 /// # }, |mut stream| async move {
433 /// // 1, 2, 3, 4
434 /// # for w in (1..5) {
435 /// # assert_eq!(stream.next().await.unwrap(), w);
436 /// # }
437 /// # }));
438 /// ```
439 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
440 where
441 T: IntoIterator<Item = U>,
442 {
443 self.flat_map_ordered(q!(|d| d))
444 }
445
446 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
447 /// for the element type `T` to produce items in any order.
448 ///
449 /// # Example
450 /// ```rust
451 /// # use hydro_lang::{*, stream::ExactlyOnce};
452 /// # use futures::StreamExt;
453 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
454 /// process
455 /// .source_iter(q!(vec![
456 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
457 /// std::collections::HashSet::from_iter(vec![3, 4]),
458 /// ]))
459 /// .flatten_unordered()
460 /// # }, |mut stream| async move {
461 /// // 1, 2, 3, 4, but in no particular order
462 /// # let mut results = Vec::new();
463 /// # for w in (1..5) {
464 /// # results.push(stream.next().await.unwrap());
465 /// # }
466 /// # results.sort();
467 /// # assert_eq!(results, vec![1, 2, 3, 4]);
468 /// # }));
469 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
470 where
471 T: IntoIterator<Item = U>,
472 {
473 self.flat_map_unordered(q!(|d| d))
474 }
475
476 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
477 /// `f`, preserving the order of the elements.
478 ///
479 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
480 /// not modify or take ownership of the values. If you need to modify the values while filtering
481 /// use [`Stream::filter_map`] instead.
482 ///
483 /// # Example
484 /// ```rust
485 /// # use hydro_lang::*;
486 /// # use futures::StreamExt;
487 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
488 /// process
489 /// .source_iter(q!(vec![1, 2, 3, 4]))
490 /// .filter(q!(|&x| x > 2))
491 /// # }, |mut stream| async move {
492 /// // 3, 4
493 /// # for w in (3..5) {
494 /// # assert_eq!(stream.next().await.unwrap(), w);
495 /// # }
496 /// # }));
497 /// ```
498 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
499 where
500 F: Fn(&T) -> bool + 'a,
501 {
502 let f = f.splice_fn1_borrow_ctx(&self.location).into();
503 Stream::new(
504 self.location.clone(),
505 HydroNode::Filter {
506 f,
507 input: Box::new(self.ir_node.into_inner()),
508 metadata: self.location.new_node_metadata::<T>(),
509 },
510 )
511 }
512
513 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
514 ///
515 /// # Example
516 /// ```rust
517 /// # use hydro_lang::*;
518 /// # use futures::StreamExt;
519 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
520 /// process
521 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
522 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
523 /// # }, |mut stream| async move {
524 /// // 1, 2
525 /// # for w in (1..3) {
526 /// # assert_eq!(stream.next().await.unwrap(), w);
527 /// # }
528 /// # }));
529 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
530 where
531 F: Fn(T) -> Option<U> + 'a,
532 {
533 let f = f.splice_fn1_ctx(&self.location).into();
534 Stream::new(
535 self.location.clone(),
536 HydroNode::FilterMap {
537 f,
538 input: Box::new(self.ir_node.into_inner()),
539 metadata: self.location.new_node_metadata::<U>(),
540 },
541 )
542 }
543
544 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
545 /// where `x` is the final value of `other`, a bounded [`Singleton`].
546 ///
547 /// # Example
548 /// ```rust
549 /// # use hydro_lang::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
552 /// let tick = process.tick();
553 /// let batch = unsafe {
554 /// process
555 /// .source_iter(q!(vec![1, 2, 3, 4]))
556 /// .tick_batch(&tick)
557 /// };
558 /// let count = batch.clone().count(); // `count()` returns a singleton
559 /// batch.cross_singleton(count).all_ticks()
560 /// # }, |mut stream| async move {
561 /// // (1, 4), (2, 4), (3, 4), (4, 4)
562 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
563 /// # assert_eq!(stream.next().await.unwrap(), w);
564 /// # }
565 /// # }));
566 pub fn cross_singleton<O2>(
567 self,
568 other: impl Into<Optional<O2, L, Bounded>>,
569 ) -> Stream<(T, O2), L, B, O, R>
570 where
571 O2: Clone,
572 {
573 let other: Optional<O2, L, Bounded> = other.into();
574 check_matching_location(&self.location, &other.location);
575
576 Stream::new(
577 self.location.clone(),
578 HydroNode::CrossSingleton {
579 left: Box::new(self.ir_node.into_inner()),
580 right: Box::new(other.ir_node.into_inner()),
581 metadata: self.location.new_node_metadata::<(T, O2)>(),
582 },
583 )
584 }
585
586 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
587 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
588 self.cross_singleton(signal.map(q!(|_u| ())))
589 .map(q!(|(d, _signal)| d))
590 }
591
592 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
593 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
594 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
595 }
596
597 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
598 /// tupled pairs in a non-deterministic order.
599 ///
600 /// # Example
601 /// ```rust
602 /// # use hydro_lang::*;
603 /// # use std::collections::HashSet;
604 /// # use futures::StreamExt;
605 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
606 /// let tick = process.tick();
607 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
608 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
609 /// stream1.cross_product(stream2)
610 /// # }, |mut stream| async move {
611 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
612 /// # stream.map(|i| assert!(expected.contains(&i)));
613 /// # }));
614 pub fn cross_product<O2>(
615 self,
616 other: Stream<O2, L, B, O, R>,
617 ) -> Stream<(T, O2), L, B, NoOrder, R>
618 where
619 T: Clone,
620 O2: Clone,
621 {
622 check_matching_location(&self.location, &other.location);
623
624 Stream::new(
625 self.location.clone(),
626 HydroNode::CrossProduct {
627 left: Box::new(self.ir_node.into_inner()),
628 right: Box::new(other.ir_node.into_inner()),
629 metadata: self.location.new_node_metadata::<(T, O2)>(),
630 },
631 )
632 }
633
634 /// Takes one stream as input and filters out any duplicate occurrences. The output
635 /// contains all unique values from the input.
636 ///
637 /// # Example
638 /// ```rust
639 /// # use hydro_lang::*;
640 /// # use futures::StreamExt;
641 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
642 /// let tick = process.tick();
643 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
644 /// # }, |mut stream| async move {
645 /// # for w in vec![1, 2, 3, 4] {
646 /// # assert_eq!(stream.next().await.unwrap(), w);
647 /// # }
648 /// # }));
649 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
650 where
651 T: Eq + Hash,
652 {
653 Stream::new(
654 self.location.clone(),
655 HydroNode::Unique {
656 input: Box::new(self.ir_node.into_inner()),
657 metadata: self.location.new_node_metadata::<T>(),
658 },
659 )
660 }
661
662 /// Outputs everything in this stream that is *not* contained in the `other` stream.
663 ///
664 /// The `other` stream must be [`Bounded`], since this function will wait until
665 /// all its elements are available before producing any output.
666 /// # Example
667 /// ```rust
668 /// # use hydro_lang::*;
669 /// # use futures::StreamExt;
670 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
671 /// let tick = process.tick();
672 /// let stream = unsafe {
673 /// process
674 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
675 /// .tick_batch(&tick)
676 /// };
677 /// let batch = unsafe {
678 /// process
679 /// .source_iter(q!(vec![1, 2]))
680 /// .tick_batch(&tick)
681 /// };
682 /// stream.filter_not_in(batch).all_ticks()
683 /// # }, |mut stream| async move {
684 /// # for w in vec![3, 4] {
685 /// # assert_eq!(stream.next().await.unwrap(), w);
686 /// # }
687 /// # }));
688 pub fn filter_not_in<O2>(
689 self,
690 other: Stream<T, L, Bounded, O2, R>,
691 ) -> Stream<T, L, Bounded, O, R>
692 where
693 T: Eq + Hash,
694 {
695 check_matching_location(&self.location, &other.location);
696
697 Stream::new(
698 self.location.clone(),
699 HydroNode::Difference {
700 pos: Box::new(self.ir_node.into_inner()),
701 neg: Box::new(other.ir_node.into_inner()),
702 metadata: self.location.new_node_metadata::<T>(),
703 },
704 )
705 }
706
707 /// An operator which allows you to "inspect" each element of a stream without
708 /// modifying it. The closure `f` is called on a reference to each item. This is
709 /// mainly useful for debugging, and should not be used to generate side-effects.
710 ///
711 /// # Example
712 /// ```rust
713 /// # use hydro_lang::*;
714 /// # use futures::StreamExt;
715 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
716 /// let nums = process.source_iter(q!(vec![1, 2]));
717 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
718 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
719 /// # }, |mut stream| async move {
720 /// # for w in vec![1, 2] {
721 /// # assert_eq!(stream.next().await.unwrap(), w);
722 /// # }
723 /// # }));
724 /// ```
725 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
726 where
727 F: Fn(&T) + 'a,
728 {
729 let f = f.splice_fn1_borrow_ctx(&self.location).into();
730
731 if L::is_top_level() {
732 Stream::new(
733 self.location.clone(),
734 HydroNode::Persist {
735 inner: Box::new(HydroNode::Inspect {
736 f,
737 input: Box::new(HydroNode::Unpersist {
738 inner: Box::new(self.ir_node.into_inner()),
739 metadata: self.location.new_node_metadata::<T>(),
740 }),
741 metadata: self.location.new_node_metadata::<T>(),
742 }),
743 metadata: self.location.new_node_metadata::<T>(),
744 },
745 )
746 } else {
747 Stream::new(
748 self.location.clone(),
749 HydroNode::Inspect {
750 f,
751 input: Box::new(self.ir_node.into_inner()),
752 metadata: self.location.new_node_metadata::<T>(),
753 },
754 )
755 }
756 }
757
758 /// Explicitly "casts" the stream to a type with a different ordering
759 /// guarantee. Useful in unsafe code where the ordering cannot be proven
760 /// by the type-system.
761 ///
762 /// # Safety
763 /// This function is used as an escape hatch, and any mistakes in the
764 /// provided ordering guarantee will propagate into the guarantees
765 /// for the rest of the program.
766 ///
767 /// # Example
768 /// # TODO: more sensible code after Shadaj merges
769 /// ```rust
770 /// # use hydro_lang::*;
771 /// # use std::collections::HashSet;
772 /// # use futures::StreamExt;
773 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
774 /// let nums = process.source_iter(q!({
775 /// let now = std::time::SystemTime::now();
776 /// match now.elapsed().unwrap().as_secs() % 2 {
777 /// 0 => vec![5, 4, 3, 2, 1],
778 /// _ => vec![1, 2, 3, 4, 5],
779 /// }
780 /// .into_iter()
781 /// }));
782 /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
783 /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
784 /// stream
785 /// # }, |mut stream| async move {
786 /// # for w in vec![1, 2, 3, 4, 5] {
787 /// # assert!((1..=5).contains(&stream.next().await.unwrap()));
788 /// # }
789 /// # }));
790 /// ```
791 pub unsafe fn assume_ordering<O2>(self) -> Stream<T, L, B, O2, R> {
792 Stream::new(self.location, self.ir_node.into_inner())
793 }
794
795 /// Explicitly "casts" the stream to a type with a different retries
796 /// guarantee. Useful in unsafe code where the lack of retries cannot
797 /// be proven by the type-system.
798 ///
799 /// # Safety
800 /// This function is used as an escape hatch, and any mistakes in the
801 /// provided retries guarantee will propagate into the guarantees
802 /// for the rest of the program.
803 pub unsafe fn assume_retries<R2>(self) -> Stream<T, L, B, O, R2> {
804 Stream::new(self.location, self.ir_node.into_inner())
805 }
806
807 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
808 unsafe {
809 // SAFETY: this is a weaker retry guarantee, so it is safe to assume
810 self.assume_retries::<AtLeastOnce>()
811 }
812 }
813}
814
815impl<'a, T, L, B, O, R> Stream<&T, L, B, O, R>
816where
817 L: Location<'a>,
818{
819 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
820 ///
821 /// # Example
822 /// ```rust
823 /// # use hydro_lang::*;
824 /// # use futures::StreamExt;
825 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
826 /// process.source_iter(q!(&[1, 2, 3])).cloned()
827 /// # }, |mut stream| async move {
828 /// // 1, 2, 3
829 /// # for w in vec![1, 2, 3] {
830 /// # assert_eq!(stream.next().await.unwrap(), w);
831 /// # }
832 /// # }));
833 /// ```
834 pub fn cloned(self) -> Stream<T, L, B, O, R>
835 where
836 T: Clone,
837 {
838 self.map(q!(|d| d.clone()))
839 }
840}
841
842impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
843where
844 L: Location<'a>,
845 O: MinOrder<NoOrder, Min = NoOrder>,
846{
847 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
848 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
849 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
850 ///
851 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
852 /// and there may be duplicates.
853 ///
854 /// # Example
855 /// ```rust
856 /// # use hydro_lang::*;
857 /// # use futures::StreamExt;
858 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
859 /// let tick = process.tick();
860 /// let bools = process.source_iter(q!(vec![false, true, false]));
861 /// let batch = unsafe { bools.tick_batch(&tick) };
862 /// batch
863 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
864 /// .all_ticks()
865 /// # }, |mut stream| async move {
866 /// // true
867 /// # assert_eq!(stream.next().await.unwrap(), true);
868 /// # }));
869 /// ```
870 pub fn fold_commutative_idempotent<A, I, F>(
871 self,
872 init: impl IntoQuotedMut<'a, I, L>,
873 comb: impl IntoQuotedMut<'a, F, L>,
874 ) -> Singleton<A, L, B>
875 where
876 I: Fn() -> A + 'a,
877 F: Fn(&mut A, T),
878 {
879 unsafe {
880 // SAFETY: the combinator function is idempotent
881 self.assume_retries()
882 }
883 .fold_commutative(init, comb) // the combinator function is commutative
884 }
885
886 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
887 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
888 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
889 /// reference, so that it can be modified in place.
890 ///
891 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
892 /// and there may be duplicates.
893 ///
894 /// # Example
895 /// ```rust
896 /// # use hydro_lang::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// let bools = process.source_iter(q!(vec![false, true, false]));
901 /// let batch = unsafe { bools.tick_batch(&tick) };
902 /// batch
903 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
904 /// .all_ticks()
905 /// # }, |mut stream| async move {
906 /// // true
907 /// # assert_eq!(stream.next().await.unwrap(), true);
908 /// # }));
909 /// ```
910 pub fn reduce_commutative_idempotent<F>(
911 self,
912 comb: impl IntoQuotedMut<'a, F, L>,
913 ) -> Optional<T, L, B>
914 where
915 F: Fn(&mut T, T) + 'a,
916 {
917 unsafe {
918 // SAFETY: the combinator function is idempotent
919 self.assume_retries()
920 }
921 .reduce_commutative(comb) // the combinator function is commutative
922 }
923
924 /// Computes the maximum element in the stream as an [`Optional`], which
925 /// will be empty until the first element in the input arrives.
926 ///
927 /// # Example
928 /// ```rust
929 /// # use hydro_lang::*;
930 /// # use futures::StreamExt;
931 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
932 /// let tick = process.tick();
933 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
934 /// let batch = unsafe { numbers.tick_batch(&tick) };
935 /// batch.max().all_ticks()
936 /// # }, |mut stream| async move {
937 /// // 4
938 /// # assert_eq!(stream.next().await.unwrap(), 4);
939 /// # }));
940 /// ```
941 pub fn max(self) -> Optional<T, L, B>
942 where
943 T: Ord,
944 {
945 self.reduce_commutative_idempotent(q!(|curr, new| {
946 if new > *curr {
947 *curr = new;
948 }
949 }))
950 }
951
952 /// Computes the minimum element in the stream as an [`Optional`], which
953 /// will be empty until the first element in the input arrives.
954 ///
955 /// # Example
956 /// ```rust
957 /// # use hydro_lang::*;
958 /// # use futures::StreamExt;
959 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
960 /// let tick = process.tick();
961 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
962 /// let batch = unsafe { numbers.tick_batch(&tick) };
963 /// batch.min().all_ticks()
964 /// # }, |mut stream| async move {
965 /// // 1
966 /// # assert_eq!(stream.next().await.unwrap(), 1);
967 /// # }));
968 /// ```
969 pub fn min(self) -> Optional<T, L, B>
970 where
971 T: Ord,
972 {
973 self.reduce_commutative_idempotent(q!(|curr, new| {
974 if new < *curr {
975 *curr = new;
976 }
977 }))
978 }
979}
980
981impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
982where
983 L: Location<'a>,
984 O: MinOrder<NoOrder, Min = NoOrder>,
985{
986 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
987 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
988 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
989 ///
990 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
991 ///
992 /// # Example
993 /// ```rust
994 /// # use hydro_lang::*;
995 /// # use futures::StreamExt;
996 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
997 /// let tick = process.tick();
998 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
999 /// let batch = unsafe { numbers.tick_batch(&tick) };
1000 /// batch
1001 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1002 /// .all_ticks()
1003 /// # }, |mut stream| async move {
1004 /// // 10
1005 /// # assert_eq!(stream.next().await.unwrap(), 10);
1006 /// # }));
1007 /// ```
1008 pub fn fold_commutative<A, I, F>(
1009 self,
1010 init: impl IntoQuotedMut<'a, I, L>,
1011 comb: impl IntoQuotedMut<'a, F, L>,
1012 ) -> Singleton<A, L, B>
1013 where
1014 I: Fn() -> A + 'a,
1015 F: Fn(&mut A, T),
1016 {
1017 unsafe {
1018 // SAFETY: the combinator function is commutative
1019 self.assume_ordering()
1020 }
1021 .fold(init, comb)
1022 }
1023
1024 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1025 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1026 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1027 /// reference, so that it can be modified in place.
1028 ///
1029 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1030 ///
1031 /// # Example
1032 /// ```rust
1033 /// # use hydro_lang::*;
1034 /// # use futures::StreamExt;
1035 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1036 /// let tick = process.tick();
1037 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1038 /// let batch = unsafe { numbers.tick_batch(&tick) };
1039 /// batch
1040 /// .reduce_commutative(q!(|curr, new| *curr += new))
1041 /// .all_ticks()
1042 /// # }, |mut stream| async move {
1043 /// // 10
1044 /// # assert_eq!(stream.next().await.unwrap(), 10);
1045 /// # }));
1046 /// ```
1047 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1048 where
1049 F: Fn(&mut T, T) + 'a,
1050 {
1051 unsafe {
1052 // SAFETY: the combinator function is commutative
1053 self.assume_ordering()
1054 }
1055 .reduce(comb)
1056 }
1057
1058 /// Computes the maximum element in the stream as an [`Optional`], where the
1059 /// maximum is determined according to the `key` function. The [`Optional`] will
1060 /// be empty until the first element in the input arrives.
1061 ///
1062 /// # Example
1063 /// ```rust
1064 /// # use hydro_lang::*;
1065 /// # use futures::StreamExt;
1066 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1067 /// let tick = process.tick();
1068 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1069 /// let batch = unsafe { numbers.tick_batch(&tick) };
1070 /// batch.max_by_key(q!(|x| -x)).all_ticks()
1071 /// # }, |mut stream| async move {
1072 /// // 1
1073 /// # assert_eq!(stream.next().await.unwrap(), 1);
1074 /// # }));
1075 /// ```
1076 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1077 where
1078 K: Ord,
1079 F: Fn(&T) -> K + 'a,
1080 {
1081 let f = key.splice_fn1_borrow_ctx(&self.location);
1082
1083 let wrapped: syn::Expr = parse_quote!({
1084 let key_fn = #f;
1085 move |curr, new| {
1086 if key_fn(&new) > key_fn(&*curr) {
1087 *curr = new;
1088 }
1089 }
1090 });
1091
1092 let mut core = HydroNode::Reduce {
1093 f: wrapped.into(),
1094 input: Box::new(self.ir_node.into_inner()),
1095 metadata: self.location.new_node_metadata::<T>(),
1096 };
1097
1098 if L::is_top_level() {
1099 core = HydroNode::Persist {
1100 inner: Box::new(core),
1101 metadata: self.location.new_node_metadata::<T>(),
1102 };
1103 }
1104
1105 Optional::new(self.location, core)
1106 }
1107
1108 /// Computes the number of elements in the stream as a [`Singleton`].
1109 ///
1110 /// # Example
1111 /// ```rust
1112 /// # use hydro_lang::*;
1113 /// # use futures::StreamExt;
1114 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1115 /// let tick = process.tick();
1116 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1117 /// let batch = unsafe { numbers.tick_batch(&tick) };
1118 /// batch.count().all_ticks()
1119 /// # }, |mut stream| async move {
1120 /// // 4
1121 /// # assert_eq!(stream.next().await.unwrap(), 4);
1122 /// # }));
1123 /// ```
1124 pub fn count(self) -> Singleton<usize, L, B> {
1125 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1126 }
1127}
1128
1129impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
1130where
1131 L: Location<'a>,
1132{
1133 /// Returns a stream with the current count tupled with each element in the input stream.
1134 ///
1135 /// # Example
1136 /// ```rust
1137 /// # use hydro_lang::{*, stream::ExactlyOnce};
1138 /// # use futures::StreamExt;
1139 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1140 /// let tick = process.tick();
1141 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1142 /// numbers.enumerate()
1143 /// # }, |mut stream| async move {
1144 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1145 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1146 /// # assert_eq!(stream.next().await.unwrap(), w);
1147 /// # }
1148 /// # }));
1149 /// ```
1150 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1151 if L::is_top_level() {
1152 Stream::new(
1153 self.location.clone(),
1154 HydroNode::Persist {
1155 inner: Box::new(HydroNode::Enumerate {
1156 is_static: true,
1157 input: Box::new(HydroNode::Unpersist {
1158 inner: Box::new(self.ir_node.into_inner()),
1159 metadata: self.location.new_node_metadata::<T>(),
1160 }),
1161 metadata: self.location.new_node_metadata::<(usize, T)>(),
1162 }),
1163 metadata: self.location.new_node_metadata::<(usize, T)>(),
1164 },
1165 )
1166 } else {
1167 Stream::new(
1168 self.location.clone(),
1169 HydroNode::Enumerate {
1170 is_static: false,
1171 input: Box::new(self.ir_node.into_inner()),
1172 metadata: self.location.new_node_metadata::<(usize, T)>(),
1173 },
1174 )
1175 }
1176 }
1177
1178 /// Computes the first element in the stream as an [`Optional`], which
1179 /// will be empty until the first element in the input arrives.
1180 ///
1181 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1182 /// re-ordering of elements may cause the first element to change.
1183 ///
1184 /// # Example
1185 /// ```rust
1186 /// # use hydro_lang::*;
1187 /// # use futures::StreamExt;
1188 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1189 /// let tick = process.tick();
1190 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1191 /// let batch = unsafe { numbers.tick_batch(&tick) };
1192 /// batch.first().all_ticks()
1193 /// # }, |mut stream| async move {
1194 /// // 1
1195 /// # assert_eq!(stream.next().await.unwrap(), 1);
1196 /// # }));
1197 /// ```
1198 pub fn first(self) -> Optional<T, L, B> {
1199 Optional::new(self.location, self.ir_node.into_inner())
1200 }
1201
1202 /// Computes the last element in the stream as an [`Optional`], which
1203 /// will be empty until an element in the input arrives.
1204 ///
1205 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1206 /// re-ordering of elements may cause the last element to change.
1207 ///
1208 /// # Example
1209 /// ```rust
1210 /// # use hydro_lang::*;
1211 /// # use futures::StreamExt;
1212 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1213 /// let tick = process.tick();
1214 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1215 /// let batch = unsafe { numbers.tick_batch(&tick) };
1216 /// batch.last().all_ticks()
1217 /// # }, |mut stream| async move {
1218 /// // 4
1219 /// # assert_eq!(stream.next().await.unwrap(), 4);
1220 /// # }));
1221 /// ```
1222 pub fn last(self) -> Optional<T, L, B> {
1223 self.reduce(q!(|curr, new| *curr = new))
1224 }
1225
1226 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1227 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1228 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1229 ///
1230 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1231 /// to depend on the order of elements in the stream.
1232 ///
1233 /// # Example
1234 /// ```rust
1235 /// # use hydro_lang::*;
1236 /// # use futures::StreamExt;
1237 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1238 /// let tick = process.tick();
1239 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1240 /// let batch = unsafe { words.tick_batch(&tick) };
1241 /// batch
1242 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1243 /// .all_ticks()
1244 /// # }, |mut stream| async move {
1245 /// // "HELLOWORLD"
1246 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1247 /// # }));
1248 /// ```
1249 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1250 self,
1251 init: impl IntoQuotedMut<'a, I, L>,
1252 comb: impl IntoQuotedMut<'a, F, L>,
1253 ) -> Singleton<A, L, B> {
1254 let init = init.splice_fn0_ctx(&self.location).into();
1255 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1256
1257 let mut core = HydroNode::Fold {
1258 init,
1259 acc: comb,
1260 input: Box::new(self.ir_node.into_inner()),
1261 metadata: self.location.new_node_metadata::<A>(),
1262 };
1263
1264 if L::is_top_level() {
1265 // top-level (possibly unbounded) singletons are represented as
1266 // a stream which produces all values from all ticks every tick,
1267 // so Unpersist will always give the lastest aggregation
1268 core = HydroNode::Persist {
1269 inner: Box::new(core),
1270 metadata: self.location.new_node_metadata::<A>(),
1271 };
1272 }
1273
1274 Singleton::new(self.location, core)
1275 }
1276
1277 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1278 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1279 /// until the first element in the input arrives.
1280 ///
1281 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1282 /// to depend on the order of elements in the stream.
1283 ///
1284 /// # Example
1285 /// ```rust
1286 /// # use hydro_lang::*;
1287 /// # use futures::StreamExt;
1288 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1289 /// let tick = process.tick();
1290 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1291 /// let batch = unsafe { words.tick_batch(&tick) };
1292 /// batch
1293 /// .map(q!(|x| x.to_string()))
1294 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1295 /// .all_ticks()
1296 /// # }, |mut stream| async move {
1297 /// // "HELLOWORLD"
1298 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1299 /// # }));
1300 /// ```
1301 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1302 self,
1303 comb: impl IntoQuotedMut<'a, F, L>,
1304 ) -> Optional<T, L, B> {
1305 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1306 let mut core = HydroNode::Reduce {
1307 f,
1308 input: Box::new(self.ir_node.into_inner()),
1309 metadata: self.location.new_node_metadata::<T>(),
1310 };
1311
1312 if L::is_top_level() {
1313 core = HydroNode::Persist {
1314 inner: Box::new(core),
1315 metadata: self.location.new_node_metadata::<T>(),
1316 };
1317 }
1318
1319 Optional::new(self.location, core)
1320 }
1321}
1322
1323impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1324 /// Produces a new stream that interleaves the elements of the two input streams.
1325 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1326 ///
1327 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1328 /// [`Bounded`], you can use [`Stream::chain`] instead.
1329 ///
1330 /// # Example
1331 /// ```rust
1332 /// # use hydro_lang::*;
1333 /// # use futures::StreamExt;
1334 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1335 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1336 /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1337 /// # }, |mut stream| async move {
1338 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1339 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1340 /// # assert_eq!(stream.next().await.unwrap(), w);
1341 /// # }
1342 /// # }));
1343 /// ```
1344 pub fn union<O2, R2: MinRetries<R>>(
1345 self,
1346 other: Stream<T, L, Unbounded, O2, R2>,
1347 ) -> Stream<T, L, Unbounded, NoOrder, R2::Min> {
1348 let tick = self.location.tick();
1349 unsafe {
1350 // SAFETY: Because the outputs are unordered,
1351 // we can interleave batches from both streams.
1352 self.tick_batch(&tick)
1353 .assume_ordering::<NoOrder>()
1354 .assume_retries::<R2::Min>()
1355 .chain(
1356 other
1357 .tick_batch(&tick)
1358 .assume_ordering::<NoOrder>()
1359 .assume_retries::<R2::Min>(),
1360 )
1361 .all_ticks()
1362 .assume_ordering()
1363 }
1364 }
1365}
1366
1367impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1368where
1369 L: Location<'a>,
1370{
1371 /// Produces a new stream that emits the input elements in sorted order.
1372 ///
1373 /// The input stream can have any ordering guarantee, but the output stream
1374 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1375 /// elements in the input stream are available, so it requires the input stream
1376 /// to be [`Bounded`].
1377 ///
1378 /// # Example
1379 /// ```rust
1380 /// # use hydro_lang::*;
1381 /// # use futures::StreamExt;
1382 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1383 /// let tick = process.tick();
1384 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1385 /// let batch = unsafe { numbers.tick_batch(&tick) };
1386 /// batch.sort().all_ticks()
1387 /// # }, |mut stream| async move {
1388 /// // 1, 2, 3, 4
1389 /// # for w in (1..5) {
1390 /// # assert_eq!(stream.next().await.unwrap(), w);
1391 /// # }
1392 /// # }));
1393 /// ```
1394 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1395 where
1396 T: Ord,
1397 {
1398 Stream::new(
1399 self.location.clone(),
1400 HydroNode::Sort {
1401 input: Box::new(self.ir_node.into_inner()),
1402 metadata: self.location.new_node_metadata::<T>(),
1403 },
1404 )
1405 }
1406
1407 /// Produces a new stream that first emits the elements of the `self` stream,
1408 /// and then emits the elements of the `other` stream. The output stream has
1409 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1410 /// [`TotalOrder`] guarantee.
1411 ///
1412 /// Currently, both input streams must be [`Bounded`]. This operator will block
1413 /// on the first stream until all its elements are available. In a future version,
1414 /// we will relax the requirement on the `other` stream.
1415 ///
1416 /// # Example
1417 /// ```rust
1418 /// # use hydro_lang::*;
1419 /// # use futures::StreamExt;
1420 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1421 /// let tick = process.tick();
1422 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1423 /// let batch = unsafe { numbers.tick_batch(&tick) };
1424 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1425 /// # }, |mut stream| async move {
1426 /// // 2, 3, 4, 5, 1, 2, 3, 4
1427 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1428 /// # assert_eq!(stream.next().await.unwrap(), w);
1429 /// # }
1430 /// # }));
1431 /// ```
1432 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1433 where
1434 O: MinOrder<O2>,
1435 {
1436 check_matching_location(&self.location, &other.location);
1437
1438 Stream::new(
1439 self.location.clone(),
1440 HydroNode::Chain {
1441 first: Box::new(self.ir_node.into_inner()),
1442 second: Box::new(other.ir_node.into_inner()),
1443 metadata: self.location.new_node_metadata::<T>(),
1444 },
1445 )
1446 }
1447}
1448
1449impl<'a, K, V1, L, B, O, R> Stream<(K, V1), L, B, O, R>
1450where
1451 L: Location<'a>,
1452{
1453 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1454 /// by equi-joining the two streams on the key attribute `K`.
1455 ///
1456 /// # Example
1457 /// ```rust
1458 /// # use hydro_lang::*;
1459 /// # use std::collections::HashSet;
1460 /// # use futures::StreamExt;
1461 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1462 /// let tick = process.tick();
1463 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1464 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1465 /// stream1.join(stream2)
1466 /// # }, |mut stream| async move {
1467 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1468 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1469 /// # stream.map(|i| assert!(expected.contains(&i)));
1470 /// # }));
1471 pub fn join<V2, O2>(
1472 self,
1473 n: Stream<(K, V2), L, B, O2, R>,
1474 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1475 where
1476 K: Eq + Hash,
1477 {
1478 check_matching_location(&self.location, &n.location);
1479
1480 Stream::new(
1481 self.location.clone(),
1482 HydroNode::Join {
1483 left: Box::new(self.ir_node.into_inner()),
1484 right: Box::new(n.ir_node.into_inner()),
1485 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1486 },
1487 )
1488 }
1489
1490 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1491 /// computes the anti-join of the items in the input -- i.e. returns
1492 /// unique items in the first input that do not have a matching key
1493 /// in the second input.
1494 ///
1495 /// # Example
1496 /// ```rust
1497 /// # use hydro_lang::*;
1498 /// # use futures::StreamExt;
1499 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1500 /// let tick = process.tick();
1501 /// let stream = unsafe {
1502 /// process
1503 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1504 /// .tick_batch(&tick)
1505 /// };
1506 /// let batch = unsafe {
1507 /// process
1508 /// .source_iter(q!(vec![1, 2]))
1509 /// .tick_batch(&tick)
1510 /// };
1511 /// stream.anti_join(batch).all_ticks()
1512 /// # }, |mut stream| async move {
1513 /// # for w in vec![(3, 'c'), (4, 'd')] {
1514 /// # assert_eq!(stream.next().await.unwrap(), w);
1515 /// # }
1516 /// # }));
1517 pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2, R>) -> Stream<(K, V1), L, B, O, R>
1518 where
1519 K: Eq + Hash,
1520 {
1521 check_matching_location(&self.location, &n.location);
1522
1523 Stream::new(
1524 self.location.clone(),
1525 HydroNode::AntiJoin {
1526 pos: Box::new(self.ir_node.into_inner()),
1527 neg: Box::new(n.ir_node.into_inner()),
1528 metadata: self.location.new_node_metadata::<(K, V1)>(),
1529 },
1530 )
1531 }
1532}
1533
1534impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1535where
1536 K: Eq + Hash,
1537 L: Location<'a>,
1538{
1539 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1540 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1541 /// in the second element are accumulated via the `comb` closure.
1542 ///
1543 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1544 /// to depend on the order of elements in the stream.
1545 ///
1546 /// If the input and output value types are the same and do not require initialization then use
1547 /// [`Stream::reduce_keyed`].
1548 ///
1549 /// # Example
1550 /// ```rust
1551 /// # use hydro_lang::*;
1552 /// # use futures::StreamExt;
1553 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1554 /// let tick = process.tick();
1555 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1556 /// let batch = unsafe { numbers.tick_batch(&tick) };
1557 /// batch
1558 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1559 /// .all_ticks()
1560 /// # }, |mut stream| async move {
1561 /// // (1, 5), (2, 7)
1562 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1563 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1564 /// # }));
1565 /// ```
1566 pub fn fold_keyed<A, I, F>(
1567 self,
1568 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1569 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1570 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1571 where
1572 I: Fn() -> A + 'a,
1573 F: Fn(&mut A, V) + 'a,
1574 {
1575 let init = init.splice_fn0_ctx(&self.location).into();
1576 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1577
1578 Stream::new(
1579 self.location.clone(),
1580 HydroNode::FoldKeyed {
1581 init,
1582 acc: comb,
1583 input: Box::new(self.ir_node.into_inner()),
1584 metadata: self.location.new_node_metadata::<(K, A)>(),
1585 },
1586 )
1587 }
1588
1589 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1590 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1591 /// in the second element are accumulated via the `comb` closure.
1592 ///
1593 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1594 /// to depend on the order of elements in the stream.
1595 ///
1596 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1597 ///
1598 /// # Example
1599 /// ```rust
1600 /// # use hydro_lang::*;
1601 /// # use futures::StreamExt;
1602 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1603 /// let tick = process.tick();
1604 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1605 /// let batch = unsafe { numbers.tick_batch(&tick) };
1606 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1607 /// # }, |mut stream| async move {
1608 /// // (1, 5), (2, 7)
1609 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1610 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1611 /// # }));
1612 /// ```
1613 pub fn reduce_keyed<F>(
1614 self,
1615 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1616 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1617 where
1618 F: Fn(&mut V, V) + 'a,
1619 {
1620 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1621
1622 Stream::new(
1623 self.location.clone(),
1624 HydroNode::ReduceKeyed {
1625 f,
1626 input: Box::new(self.ir_node.into_inner()),
1627 metadata: self.location.new_node_metadata::<(K, V)>(),
1628 },
1629 )
1630 }
1631}
1632
1633impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
1634where
1635 K: Eq + Hash,
1636 L: Location<'a>,
1637{
1638 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1639 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1640 /// in the second element are accumulated via the `comb` closure.
1641 ///
1642 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1643 ///
1644 /// If the input and output value types are the same and do not require initialization then use
1645 /// [`Stream::reduce_keyed_commutative`].
1646 ///
1647 /// # Example
1648 /// ```rust
1649 /// # use hydro_lang::*;
1650 /// # use futures::StreamExt;
1651 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1652 /// let tick = process.tick();
1653 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1654 /// let batch = unsafe { numbers.tick_batch(&tick) };
1655 /// batch
1656 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1657 /// .all_ticks()
1658 /// # }, |mut stream| async move {
1659 /// // (1, 5), (2, 7)
1660 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1661 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1662 /// # }));
1663 /// ```
1664 pub fn fold_keyed_commutative<A, I, F>(
1665 self,
1666 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1667 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1668 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1669 where
1670 I: Fn() -> A + 'a,
1671 F: Fn(&mut A, V) + 'a,
1672 {
1673 let init = init.splice_fn0_ctx(&self.location).into();
1674 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1675
1676 Stream::new(
1677 self.location.clone(),
1678 HydroNode::FoldKeyed {
1679 init,
1680 acc: comb,
1681 input: Box::new(self.ir_node.into_inner()),
1682 metadata: self.location.new_node_metadata::<(K, A)>(),
1683 },
1684 )
1685 }
1686
1687 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1688 /// # Example
1689 /// ```rust
1690 /// # use hydro_lang::*;
1691 /// # use futures::StreamExt;
1692 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1693 /// let tick = process.tick();
1694 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1695 /// let batch = unsafe { numbers.tick_batch(&tick) };
1696 /// batch.keys().all_ticks()
1697 /// # }, |mut stream| async move {
1698 /// // 1, 2
1699 /// # assert_eq!(stream.next().await.unwrap(), 1);
1700 /// # assert_eq!(stream.next().await.unwrap(), 2);
1701 /// # }));
1702 /// ```
1703 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1704 self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1705 .map(q!(|(k, _)| k))
1706 }
1707
1708 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1709 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1710 /// in the second element are accumulated via the `comb` closure.
1711 ///
1712 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1713 ///
1714 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1715 ///
1716 /// # Example
1717 /// ```rust
1718 /// # use hydro_lang::*;
1719 /// # use futures::StreamExt;
1720 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1721 /// let tick = process.tick();
1722 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1723 /// let batch = unsafe { numbers.tick_batch(&tick) };
1724 /// batch
1725 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1726 /// .all_ticks()
1727 /// # }, |mut stream| async move {
1728 /// // (1, 5), (2, 7)
1729 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1730 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1731 /// # }));
1732 /// ```
1733 pub fn reduce_keyed_commutative<F>(
1734 self,
1735 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1736 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1737 where
1738 F: Fn(&mut V, V) + 'a,
1739 {
1740 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1741
1742 Stream::new(
1743 self.location.clone(),
1744 HydroNode::ReduceKeyed {
1745 f,
1746 input: Box::new(self.ir_node.into_inner()),
1747 metadata: self.location.new_node_metadata::<(K, V)>(),
1748 },
1749 )
1750 }
1751}
1752
1753impl<'a, T, L, B, O, R> Stream<T, Atomic<L>, B, O, R>
1754where
1755 L: Location<'a> + NoTick,
1756{
1757 /// Returns a stream corresponding to the latest batch of elements being atomically
1758 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1759 /// the order of the input.
1760 ///
1761 /// # Safety
1762 /// The batch boundaries are non-deterministic and may change across executions.
1763 pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, O, R> {
1764 Stream::new(
1765 self.location.clone().tick,
1766 HydroNode::Unpersist {
1767 inner: Box::new(self.ir_node.into_inner()),
1768 metadata: self.location.new_node_metadata::<T>(),
1769 },
1770 )
1771 }
1772
1773 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
1774 Stream::new(self.location.tick.l, self.ir_node.into_inner())
1775 }
1776
1777 pub fn atomic_source(&self) -> Tick<L> {
1778 self.location.tick.clone()
1779 }
1780}
1781
1782impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
1783where
1784 L: Location<'a> + NoTick + NoAtomic,
1785{
1786 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1787 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1788 }
1789
1790 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1791 /// Future outputs are produced as available, regardless of input arrival order.
1792 ///
1793 /// # Example
1794 /// ```rust
1795 /// # use std::collections::HashSet;
1796 /// # use futures::StreamExt;
1797 /// # use hydro_lang::*;
1798 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1799 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1800 /// .map(q!(|x| async move {
1801 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1802 /// x
1803 /// }))
1804 /// .resolve_futures()
1805 /// # },
1806 /// # |mut stream| async move {
1807 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
1808 /// # let mut output = HashSet::new();
1809 /// # for _ in 1..10 {
1810 /// # output.insert(stream.next().await.unwrap());
1811 /// # }
1812 /// # assert_eq!(
1813 /// # output,
1814 /// # HashSet::<i32>::from_iter(1..10)
1815 /// # );
1816 /// # },
1817 /// # ));
1818 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
1819 where
1820 T: Future<Output = T2>,
1821 {
1822 Stream::new(
1823 self.location.clone(),
1824 HydroNode::ResolveFutures {
1825 input: Box::new(self.ir_node.into_inner()),
1826 metadata: self.location.new_node_metadata::<T2>(),
1827 },
1828 )
1829 }
1830
1831 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1832 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1833 /// the order of the input.
1834 ///
1835 /// # Safety
1836 /// The batch boundaries are non-deterministic and may change across executions.
1837 pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, O, R> {
1838 unsafe { self.atomic(tick).tick_batch() }
1839 }
1840
1841 /// Given a time interval, returns a stream corresponding to samples taken from the
1842 /// stream roughly at that interval. The output will have elements in the same order
1843 /// as the input, but with arbitrary elements skipped between samples. There is also
1844 /// no guarantee on the exact timing of the samples.
1845 ///
1846 /// # Safety
1847 /// The output stream is non-deterministic in which elements are sampled, since this
1848 /// is controlled by a clock.
1849 pub unsafe fn sample_every(
1850 self,
1851 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1852 ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
1853 let samples = unsafe {
1854 // SAFETY: source of intentional non-determinism
1855 self.location.source_interval(interval)
1856 };
1857
1858 let tick = self.location.tick();
1859 unsafe {
1860 // SAFETY: source of intentional non-determinism
1861 self.tick_batch(&tick)
1862 .continue_if(samples.tick_batch(&tick).first())
1863 .all_ticks()
1864 .weakest_retries()
1865 }
1866 }
1867
1868 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1869 /// stream has not emitted a value since that duration.
1870 ///
1871 /// # Safety
1872 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1873 /// samples take place, timeouts may be non-deterministically generated or missed,
1874 /// and the notification of the timeout may be delayed as well. There is also no
1875 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1876 /// detected based on when the next sample is taken.
1877 pub unsafe fn timeout(
1878 self,
1879 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1880 ) -> Optional<(), L, Unbounded>
1881 where
1882 O: MinOrder<NoOrder, Min = NoOrder>,
1883 {
1884 let tick = self.location.tick();
1885
1886 let latest_received = unsafe { self.assume_retries() }.fold_commutative(
1887 q!(|| None),
1888 q!(|latest, _| {
1889 *latest = Some(Instant::now());
1890 }),
1891 );
1892
1893 unsafe {
1894 // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1895 latest_received.latest_tick(&tick)
1896 }
1897 .filter_map(q!(move |latest_received| {
1898 if let Some(latest_received) = latest_received {
1899 if Instant::now().duration_since(latest_received) > duration {
1900 Some(())
1901 } else {
1902 None
1903 }
1904 } else {
1905 Some(())
1906 }
1907 }))
1908 .latest()
1909 }
1910}
1911
1912impl<'a, F, T, L, B, O, R> Stream<F, L, B, O, R>
1913where
1914 L: Location<'a> + NoTick + NoAtomic,
1915 F: Future<Output = T>,
1916{
1917 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
1918 /// Future outputs are produced in the same order as the input stream.
1919 ///
1920 /// # Example
1921 /// ```rust
1922 /// # use std::collections::HashSet;
1923 /// # use futures::StreamExt;
1924 /// # use hydro_lang::*;
1925 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1926 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
1927 /// .map(q!(|x| async move {
1928 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1929 /// x
1930 /// }))
1931 /// .resolve_futures_ordered()
1932 /// # },
1933 /// # |mut stream| async move {
1934 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
1935 /// # let mut output = Vec::new();
1936 /// # for _ in 1..10 {
1937 /// # output.push(stream.next().await.unwrap());
1938 /// # }
1939 /// # assert_eq!(
1940 /// # output,
1941 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
1942 /// # );
1943 /// # },
1944 /// # ));
1945 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
1946 Stream::new(
1947 self.location.clone(),
1948 HydroNode::ResolveFuturesOrdered {
1949 input: Box::new(self.ir_node.into_inner()),
1950 metadata: self.location.new_node_metadata::<T>(),
1951 },
1952 )
1953 }
1954}
1955
1956impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
1957where
1958 L: Location<'a> + NoTick,
1959{
1960 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1961 let f = f.splice_fn1_ctx(&self.location).into();
1962 let metadata = self.location.new_node_metadata::<T>();
1963 self.location
1964 .flow_state()
1965 .borrow_mut()
1966 .leaves
1967 .as_mut()
1968 .expect(FLOW_USED_MESSAGE)
1969 .push(HydroLeaf::ForEach {
1970 input: Box::new(HydroNode::Unpersist {
1971 inner: Box::new(self.ir_node.into_inner()),
1972 metadata: metadata.clone(),
1973 }),
1974 f,
1975 metadata,
1976 });
1977 }
1978
1979 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1980 where
1981 S: 'a + futures::Sink<T> + Unpin,
1982 {
1983 self.location
1984 .flow_state()
1985 .borrow_mut()
1986 .leaves
1987 .as_mut()
1988 .expect(FLOW_USED_MESSAGE)
1989 .push(HydroLeaf::DestSink {
1990 sink: sink.splice_typed_ctx(&self.location).into(),
1991 input: Box::new(self.ir_node.into_inner()),
1992 metadata: self.location.new_node_metadata::<T>(),
1993 });
1994 }
1995}
1996
1997impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
1998where
1999 L: Location<'a>,
2000{
2001 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2002 Stream::new(
2003 self.location.outer().clone(),
2004 HydroNode::Persist {
2005 inner: Box::new(self.ir_node.into_inner()),
2006 metadata: self.location.new_node_metadata::<T>(),
2007 },
2008 )
2009 }
2010
2011 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2012 Stream::new(
2013 Atomic {
2014 tick: self.location.clone(),
2015 },
2016 HydroNode::Persist {
2017 inner: Box::new(self.ir_node.into_inner()),
2018 metadata: self.location.new_node_metadata::<T>(),
2019 },
2020 )
2021 }
2022
2023 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2024 where
2025 T: Clone,
2026 {
2027 Stream::new(
2028 self.location.clone(),
2029 HydroNode::Persist {
2030 inner: Box::new(self.ir_node.into_inner()),
2031 metadata: self.location.new_node_metadata::<T>(),
2032 },
2033 )
2034 }
2035
2036 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2037 Stream::new(
2038 self.location.clone(),
2039 HydroNode::DeferTick {
2040 input: Box::new(self.ir_node.into_inner()),
2041 metadata: self.location.new_node_metadata::<T>(),
2042 },
2043 )
2044 }
2045
2046 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2047 Stream::new(
2048 self.location.clone(),
2049 HydroNode::Delta {
2050 inner: Box::new(self.ir_node.into_inner()),
2051 metadata: self.location.new_node_metadata::<T>(),
2052 },
2053 )
2054 }
2055}
2056
2057pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
2058 let root = get_this_crate();
2059
2060 if is_demux {
2061 parse_quote! {
2062 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
2063 |(id, data)| {
2064 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
2065 }
2066 )
2067 }
2068 } else {
2069 parse_quote! {
2070 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
2071 |data| {
2072 #root::runtime_support::bincode::serialize(&data).unwrap().into()
2073 }
2074 )
2075 }
2076 }
2077}
2078
2079fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
2080 serialize_bincode_with_type(is_demux, &stageleft::quote_type::<T>())
2081}
2082
2083pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
2084 let root = get_this_crate();
2085
2086 if let Some(c_type) = tagged {
2087 parse_quote! {
2088 |res| {
2089 let (id, b) = res.unwrap();
2090 (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
2091 }
2092 }
2093 } else {
2094 parse_quote! {
2095 |res| {
2096 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
2097 }
2098 }
2099 }
2100}
2101
2102pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
2103 deserialize_bincode_with_type(tagged, &stageleft::quote_type::<T>())
2104}
2105
2106impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2107where
2108 L: Location<'a> + NoTick,
2109{
2110 #[expect(
2111 clippy::type_complexity,
2112 reason = "Complex signatures for CanSend trait"
2113 )]
2114 pub fn send_bincode<L2, CoreType>(
2115 self,
2116 other: &L2,
2117 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, O::Min, R>
2118 where
2119 L::Root: CanSend<'a, L2, In<CoreType> = T>,
2120 L2: Location<'a>,
2121 CoreType: Serialize + DeserializeOwned,
2122 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2123 {
2124 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
2125
2126 let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(
2127 L::Root::tagged_type().as_ref(),
2128 ));
2129
2130 Stream::new(
2131 other.clone(),
2132 HydroNode::Network {
2133 from_key: None,
2134 to_location: other.id(),
2135 to_key: None,
2136 serialize_fn: serialize_pipeline.map(|e| e.into()),
2137 instantiate_fn: DebugInstantiate::Building,
2138 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2139 input: Box::new(self.ir_node.into_inner()),
2140 metadata: other.new_node_metadata::<CoreType>(),
2141 },
2142 )
2143 }
2144
2145 pub fn send_bincode_external<L2, CoreType>(
2146 self,
2147 other: &ExternalProcess<L2>,
2148 ) -> ExternalBincodeStream<L::Out<CoreType>>
2149 where
2150 L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
2151 L2: 'a,
2152 CoreType: Serialize + DeserializeOwned,
2153 // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
2154 {
2155 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
2156
2157 let metadata = other.new_node_metadata::<CoreType>();
2158
2159 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2160
2161 let external_key = flow_state_borrow.next_external_out;
2162 flow_state_borrow.next_external_out += 1;
2163
2164 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2165
2166 let dummy_f: syn::Expr = syn::parse_quote!(());
2167
2168 leaves.push(HydroLeaf::ForEach {
2169 f: dummy_f.into(),
2170 input: Box::new(HydroNode::Network {
2171 from_key: None,
2172 to_location: other.id(),
2173 to_key: Some(external_key),
2174 serialize_fn: serialize_pipeline.map(|e| e.into()),
2175 instantiate_fn: DebugInstantiate::Building,
2176 deserialize_fn: None,
2177 input: Box::new(self.ir_node.into_inner()),
2178 metadata: metadata.clone(),
2179 }),
2180 metadata,
2181 });
2182
2183 ExternalBincodeStream {
2184 process_id: other.id,
2185 port_id: external_key,
2186 _phantom: PhantomData,
2187 }
2188 }
2189
2190 #[expect(
2191 clippy::type_complexity,
2192 reason = "Complex signatures for CanSend trait"
2193 )]
2194 pub fn send_bytes<L2>(
2195 self,
2196 other: &L2,
2197 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, O::Min, R>
2198 where
2199 L2: Location<'a>,
2200 L::Root: CanSend<'a, L2, In<Bytes> = T>,
2201 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2202 {
2203 let root = get_this_crate();
2204 Stream::new(
2205 other.clone(),
2206 HydroNode::Network {
2207 from_key: None,
2208 to_location: other.id(),
2209 to_key: None,
2210 serialize_fn: None,
2211 instantiate_fn: DebugInstantiate::Building,
2212 deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
2213 let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
2214 Some(expr.into())
2215 } else {
2216 let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
2217 Some(expr.into())
2218 },
2219 input: Box::new(self.ir_node.into_inner()),
2220 metadata: other.new_node_metadata::<Bytes>(),
2221 },
2222 )
2223 }
2224
2225 pub fn send_bytes_external<L2>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
2226 where
2227 L2: 'a,
2228 L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
2229 {
2230 let metadata = other.new_node_metadata::<Bytes>();
2231
2232 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
2233 let external_key = flow_state_borrow.next_external_out;
2234 flow_state_borrow.next_external_out += 1;
2235
2236 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
2237
2238 let dummy_f: syn::Expr = syn::parse_quote!(());
2239
2240 leaves.push(HydroLeaf::ForEach {
2241 f: dummy_f.into(),
2242 input: Box::new(HydroNode::Network {
2243 from_key: None,
2244 to_location: other.id(),
2245 to_key: Some(external_key),
2246 serialize_fn: None,
2247 instantiate_fn: DebugInstantiate::Building,
2248 deserialize_fn: None,
2249 input: Box::new(self.ir_node.into_inner()),
2250 metadata: metadata.clone(),
2251 }),
2252 metadata,
2253 });
2254
2255 ExternalBytesPort {
2256 process_id: other.id,
2257 port_id: external_key,
2258 }
2259 }
2260
2261 pub fn send_bincode_anonymous<L2, Tag, CoreType>(
2262 self,
2263 other: &L2,
2264 ) -> Stream<CoreType, L2, Unbounded, O::Min, R>
2265 where
2266 L2: Location<'a>,
2267 L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
2268 CoreType: Serialize + DeserializeOwned,
2269 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2270 {
2271 self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
2272 }
2273
2274 pub fn send_bytes_anonymous<L2, Tag>(
2275 self,
2276 other: &L2,
2277 ) -> Stream<Bytes, L2, Unbounded, O::Min, R>
2278 where
2279 L2: Location<'a>,
2280 L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
2281 O: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<O>>,
2282 {
2283 self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
2284 }
2285
2286 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2287 pub fn broadcast_bincode<C2>(
2288 self,
2289 other: &Cluster<'a, C2>,
2290 ) -> Stream<
2291 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2292 Cluster<'a, C2>,
2293 Unbounded,
2294 O::Min,
2295 R,
2296 >
2297 where
2298 C2: 'a,
2299 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2300 T: Clone + Serialize + DeserializeOwned,
2301 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2302 {
2303 let ids = other.members();
2304
2305 let to_send: Stream<(u32, Bytes), L, B, O, R> = self
2306 .map::<Bytes, _>(q!(|v| bincode::serialize(&v).unwrap().into()))
2307 .flat_map_ordered(q!(|v| { ids.iter().map(move |id| (id.raw_id, v.clone())) }));
2308
2309 let deserialize_pipeline = Some(deserialize_bincode::<T>(L::Root::tagged_type().as_ref()));
2310
2311 Stream::new(
2312 other.clone(),
2313 HydroNode::Network {
2314 from_key: None,
2315 to_location: other.id(),
2316 to_key: None,
2317 serialize_fn: None,
2318 instantiate_fn: DebugInstantiate::Building,
2319 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
2320 input: Box::new(to_send.ir_node.into_inner()),
2321 metadata: other.new_node_metadata::<T>(),
2322 },
2323 )
2324 }
2325
2326 pub fn broadcast_bincode_anonymous<C2, Tag>(
2327 self,
2328 other: &Cluster<'a, C2>,
2329 ) -> Stream<T, Cluster<'a, C2>, Unbounded, O::Min, R>
2330 where
2331 C2: 'a,
2332 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)>,
2333 T: Clone + Serialize + DeserializeOwned,
2334 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2335 {
2336 self.broadcast_bincode(other).map(q!(|(_, b)| b))
2337 }
2338
2339 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
2340 pub fn broadcast_bytes<C2>(
2341 self,
2342 other: &Cluster<'a, C2>,
2343 ) -> Stream<
2344 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2345 Cluster<'a, C2>,
2346 Unbounded,
2347 O::Min,
2348 R,
2349 >
2350 where
2351 C2: 'a,
2352 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)>,
2353 T: Clone,
2354 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2355 {
2356 let ids = other.members();
2357
2358 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2359 ::std::clone::Clone::clone(id),
2360 ::std::clone::Clone::clone(&b)
2361 ))))
2362 .send_bytes(other)
2363 }
2364
2365 pub fn broadcast_bytes_anonymous<C2, Tag>(
2366 self,
2367 other: &Cluster<'a, C2>,
2368 ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, O::Min, R>
2369 where
2370 C2: 'a,
2371 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2372 + 'a,
2373 T: Clone,
2374 O: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<O>>,
2375 {
2376 self.broadcast_bytes(other).map(q!(|(_, b)| b))
2377 }
2378}
2379
2380#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2381impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
2382where
2383 L: Location<'a> + NoTick,
2384{
2385 pub fn round_robin_bincode<C2>(
2386 self,
2387 other: &Cluster<'a, C2>,
2388 ) -> Stream<
2389 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2390 Cluster<'a, C2>,
2391 Unbounded,
2392 <TotalOrder as MinOrder<
2393 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2394 >>::Min,
2395 ExactlyOnce,
2396 >
2397 where
2398 C2: 'a,
2399 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2400 T: Clone + Serialize + DeserializeOwned,
2401 TotalOrder:
2402 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2403 {
2404 let ids = other.members();
2405
2406 self.enumerate()
2407 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2408 .send_bincode(other)
2409 }
2410
2411 pub fn round_robin_bincode_anonymous<C2, Tag>(
2412 self,
2413 other: &Cluster<'a, C2>,
2414 ) -> Stream<
2415 T,
2416 Cluster<'a, C2>,
2417 Unbounded,
2418 <TotalOrder as MinOrder<
2419 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2420 >>::Min,
2421 ExactlyOnce,
2422 >
2423 where
2424 C2: 'a,
2425 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2426 T: Clone + Serialize + DeserializeOwned,
2427 TotalOrder:
2428 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2429 {
2430 self.round_robin_bincode(other).map(q!(|(_, b)| b))
2431 }
2432
2433 pub fn round_robin_bytes<C2>(
2434 self,
2435 other: &Cluster<'a, C2>,
2436 ) -> Stream<
2437 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2438 Cluster<'a, C2>,
2439 Unbounded,
2440 <TotalOrder as MinOrder<
2441 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2442 >>::Min,
2443 ExactlyOnce,
2444 >
2445 where
2446 C2: 'a,
2447 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2448 T: Clone,
2449 TotalOrder:
2450 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2451 {
2452 let ids = other.members();
2453
2454 self.enumerate()
2455 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2456 .send_bytes(other)
2457 }
2458
2459 pub fn round_robin_bytes_anonymous<C2, Tag>(
2460 self,
2461 other: &Cluster<'a, C2>,
2462 ) -> Stream<
2463 Bytes,
2464 Cluster<'a, C2>,
2465 Unbounded,
2466 <TotalOrder as MinOrder<
2467 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2468 >>::Min,
2469 ExactlyOnce,
2470 >
2471 where
2472 C2: 'a,
2473 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2474 + 'a,
2475 T: Clone,
2476 TotalOrder:
2477 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2478 {
2479 self.round_robin_bytes(other).map(q!(|(_, b)| b))
2480 }
2481}
2482
2483#[cfg(test)]
2484mod tests {
2485 use futures::StreamExt;
2486 use hydro_deploy::Deployment;
2487 use serde::{Deserialize, Serialize};
2488 use stageleft::q;
2489
2490 use crate::FlowBuilder;
2491 use crate::location::Location;
2492
2493 struct P1 {}
2494 struct P2 {}
2495
2496 #[derive(Serialize, Deserialize, Debug)]
2497 struct SendOverNetwork {
2498 n: u32,
2499 }
2500
2501 #[tokio::test]
2502 async fn first_ten_distributed() {
2503 let mut deployment = Deployment::new();
2504
2505 let flow = FlowBuilder::new();
2506 let first_node = flow.process::<P1>();
2507 let second_node = flow.process::<P2>();
2508 let external = flow.external_process::<P2>();
2509
2510 let numbers = first_node.source_iter(q!(0..10));
2511 let out_port = numbers
2512 .map(q!(|n| SendOverNetwork { n }))
2513 .send_bincode(&second_node)
2514 .send_bincode_external(&external);
2515
2516 let nodes = flow
2517 .with_process(&first_node, deployment.Localhost())
2518 .with_process(&second_node, deployment.Localhost())
2519 .with_external(&external, deployment.Localhost())
2520 .deploy(&mut deployment);
2521
2522 deployment.deploy().await.unwrap();
2523
2524 let mut external_out = nodes.connect_source_bincode(out_port).await;
2525
2526 deployment.start().await.unwrap();
2527
2528 for i in 0..10 {
2529 assert_eq!(external_out.next().await.unwrap().n, i);
2530 }
2531 }
2532}