hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::rc::Rc;
8
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10use syn::parse_quote;
11use tokio::time::Instant;
12
13use crate::boundedness::Boundedness;
14use crate::builder::FLOW_USED_MESSAGE;
15use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
16use crate::ir::{HydroLeaf, HydroNode, TeeNode};
17use crate::keyed_stream::KeyedStream;
18use crate::location::tick::{Atomic, NoAtomic};
19use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
20use crate::manual_expr::ManualExpr;
21use crate::unsafety::NonDet;
22use crate::*;
23
24pub mod networking;
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub enum TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub enum NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41 /// The weaker of the two orderings.
42 type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47 type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52 type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57 type Min = NoOrder;
58}
59
60/// Marks the stream as having deterministic message cardinality, with no
61/// possibility of duplicates.
62pub enum ExactlyOnce {}
63
64/// Marks the stream as having non-deterministic message cardinality, which
65/// means that duplicates may occur, but messages will not be dropped.
66pub enum AtLeastOnce {}
67
68/// Helper trait for determining the weakest of two retry guarantees.
69#[sealed::sealed]
70pub trait MinRetries<Other> {
71 /// The weaker of the two retry guarantees.
72 type Min;
73}
74
75#[sealed::sealed]
76impl<T> MinRetries<T> for T {
77 type Min = T;
78}
79
80#[sealed::sealed]
81impl MinRetries<ExactlyOnce> for AtLeastOnce {
82 type Min = AtLeastOnce;
83}
84
85#[sealed::sealed]
86impl MinRetries<AtLeastOnce> for ExactlyOnce {
87 type Min = AtLeastOnce;
88}
89
90/// An ordered sequence stream of elements of type `T`.
91///
92/// Type Parameters:
93/// - `Type`: the type of elements in the stream
94/// - `Loc`: the location where the stream is being materialized
95/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
96/// or [`Unbounded`]
97/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
98/// or [`NoOrder`] (default is [`TotalOrder`])
99pub struct Stream<Type, Loc, Bound: Boundedness, Order = TotalOrder, Retries = ExactlyOnce> {
100 pub(crate) location: Loc,
101 pub(crate) ir_node: RefCell<HydroNode>,
102
103 _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
104}
105
106impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
107where
108 L: Location<'a>,
109{
110 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
111 Stream {
112 location: stream.location,
113 ir_node: stream.ir_node,
114 _phantom: PhantomData,
115 }
116 }
117}
118
119impl<'a, T, L, B: Boundedness, R> From<Stream<T, L, B, TotalOrder, R>>
120 for Stream<T, L, B, NoOrder, R>
121where
122 L: Location<'a>,
123{
124 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
125 Stream {
126 location: stream.location,
127 ir_node: stream.ir_node,
128 _phantom: PhantomData,
129 }
130 }
131}
132
133impl<'a, T, L, B: Boundedness, O> From<Stream<T, L, B, O, ExactlyOnce>>
134 for Stream<T, L, B, O, AtLeastOnce>
135where
136 L: Location<'a>,
137{
138 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
139 Stream {
140 location: stream.location,
141 ir_node: stream.ir_node,
142 _phantom: PhantomData,
143 }
144 }
145}
146
147impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
148where
149 L: Location<'a>,
150{
151 fn defer_tick(self) -> Self {
152 Stream::defer_tick(self)
153 }
154}
155
156impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
157where
158 L: Location<'a>,
159{
160 type Location = Tick<L>;
161
162 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
163 Stream::new(
164 location.clone(),
165 HydroNode::CycleSource {
166 ident,
167 metadata: location.new_node_metadata::<T>(),
168 },
169 )
170 }
171}
172
173impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
174where
175 L: Location<'a>,
176{
177 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
178 assert_eq!(
179 self.location.id(),
180 expected_location,
181 "locations do not match"
182 );
183 self.location
184 .flow_state()
185 .borrow_mut()
186 .leaves
187 .as_mut()
188 .expect(FLOW_USED_MESSAGE)
189 .push(HydroLeaf::CycleSink {
190 ident,
191 input: Box::new(self.ir_node.into_inner()),
192 metadata: self.location.new_node_metadata::<T>(),
193 });
194 }
195}
196
197impl<'a, T, L, B: Boundedness, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
198where
199 L: Location<'a> + NoTick,
200{
201 type Location = L;
202
203 fn create_source(ident: syn::Ident, location: L) -> Self {
204 Stream::new(
205 location.clone(),
206 HydroNode::Persist {
207 inner: Box::new(HydroNode::CycleSource {
208 ident,
209 metadata: location.new_node_metadata::<T>(),
210 }),
211 metadata: location.new_node_metadata::<T>(),
212 },
213 )
214 }
215}
216
217impl<'a, T, L, B: Boundedness, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
218where
219 L: Location<'a> + NoTick,
220{
221 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
222 assert_eq!(
223 self.location.id(),
224 expected_location,
225 "locations do not match"
226 );
227 let metadata = self.location.new_node_metadata::<T>();
228 self.location
229 .flow_state()
230 .borrow_mut()
231 .leaves
232 .as_mut()
233 .expect(FLOW_USED_MESSAGE)
234 .push(HydroLeaf::CycleSink {
235 ident,
236 input: Box::new(HydroNode::Unpersist {
237 inner: Box::new(self.ir_node.into_inner()),
238 metadata: metadata.clone(),
239 }),
240 metadata,
241 });
242 }
243}
244
245impl<'a, T, L, B: Boundedness, O, R> Clone for Stream<T, L, B, O, R>
246where
247 T: Clone,
248 L: Location<'a>,
249{
250 fn clone(&self) -> Self {
251 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
252 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
253 *self.ir_node.borrow_mut() = HydroNode::Tee {
254 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
255 metadata: self.location.new_node_metadata::<T>(),
256 };
257 }
258
259 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
260 Stream {
261 location: self.location.clone(),
262 ir_node: HydroNode::Tee {
263 inner: TeeNode(inner.0.clone()),
264 metadata: metadata.clone(),
265 }
266 .into(),
267 _phantom: PhantomData,
268 }
269 } else {
270 unreachable!()
271 }
272 }
273}
274
275impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
276where
277 L: Location<'a>,
278{
279 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
280 Stream {
281 location,
282 ir_node: RefCell::new(ir_node),
283 _phantom: PhantomData,
284 }
285 }
286
287 /// Produces a stream based on invoking `f` on each element.
288 /// If you do not want to modify the stream and instead only want to view
289 /// each item use [`Stream::inspect`] instead.
290 ///
291 /// # Example
292 /// ```rust
293 /// # use hydro_lang::*;
294 /// # use futures::StreamExt;
295 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
296 /// let words = process.source_iter(q!(vec!["hello", "world"]));
297 /// words.map(q!(|x| x.to_uppercase()))
298 /// # }, |mut stream| async move {
299 /// # for w in vec!["HELLO", "WORLD"] {
300 /// # assert_eq!(stream.next().await.unwrap(), w);
301 /// # }
302 /// # }));
303 /// ```
304 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
305 where
306 F: Fn(T) -> U + 'a,
307 {
308 let f = f.splice_fn1_ctx(&self.location).into();
309 Stream::new(
310 self.location.clone(),
311 HydroNode::Map {
312 f,
313 input: Box::new(self.ir_node.into_inner()),
314 metadata: self.location.new_node_metadata::<U>(),
315 },
316 )
317 }
318
319 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
320 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
321 /// for the output type `U` must produce items in a **deterministic** order.
322 ///
323 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
324 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
325 ///
326 /// # Example
327 /// ```rust
328 /// # use hydro_lang::*;
329 /// # use futures::StreamExt;
330 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
331 /// process
332 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
333 /// .flat_map_ordered(q!(|x| x))
334 /// # }, |mut stream| async move {
335 /// // 1, 2, 3, 4
336 /// # for w in (1..5) {
337 /// # assert_eq!(stream.next().await.unwrap(), w);
338 /// # }
339 /// # }));
340 /// ```
341 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
342 where
343 I: IntoIterator<Item = U>,
344 F: Fn(T) -> I + 'a,
345 {
346 let f = f.splice_fn1_ctx(&self.location).into();
347 Stream::new(
348 self.location.clone(),
349 HydroNode::FlatMap {
350 f,
351 input: Box::new(self.ir_node.into_inner()),
352 metadata: self.location.new_node_metadata::<U>(),
353 },
354 )
355 }
356
357 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
358 /// for the output type `U` to produce items in any order.
359 ///
360 /// # Example
361 /// ```rust
362 /// # use hydro_lang::{*, stream::ExactlyOnce};
363 /// # use futures::StreamExt;
364 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
365 /// process
366 /// .source_iter(q!(vec![
367 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
368 /// std::collections::HashSet::from_iter(vec![3, 4]),
369 /// ]))
370 /// .flat_map_unordered(q!(|x| x))
371 /// # }, |mut stream| async move {
372 /// // 1, 2, 3, 4, but in no particular order
373 /// # let mut results = Vec::new();
374 /// # for w in (1..5) {
375 /// # results.push(stream.next().await.unwrap());
376 /// # }
377 /// # results.sort();
378 /// # assert_eq!(results, vec![1, 2, 3, 4]);
379 /// # }));
380 /// ```
381 pub fn flat_map_unordered<U, I, F>(
382 self,
383 f: impl IntoQuotedMut<'a, F, L>,
384 ) -> Stream<U, L, B, NoOrder, R>
385 where
386 I: IntoIterator<Item = U>,
387 F: Fn(T) -> I + 'a,
388 {
389 let f = f.splice_fn1_ctx(&self.location).into();
390 Stream::new(
391 self.location.clone(),
392 HydroNode::FlatMap {
393 f,
394 input: Box::new(self.ir_node.into_inner()),
395 metadata: self.location.new_node_metadata::<U>(),
396 },
397 )
398 }
399
400 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
401 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
402 ///
403 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
404 /// not deterministic, use [`Stream::flatten_unordered`] instead.
405 ///
406 /// ```rust
407 /// # use hydro_lang::*;
408 /// # use futures::StreamExt;
409 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
410 /// process
411 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
412 /// .flatten_ordered()
413 /// # }, |mut stream| async move {
414 /// // 1, 2, 3, 4
415 /// # for w in (1..5) {
416 /// # assert_eq!(stream.next().await.unwrap(), w);
417 /// # }
418 /// # }));
419 /// ```
420 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
421 where
422 T: IntoIterator<Item = U>,
423 {
424 self.flat_map_ordered(q!(|d| d))
425 }
426
427 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
428 /// for the element type `T` to produce items in any order.
429 ///
430 /// # Example
431 /// ```rust
432 /// # use hydro_lang::{*, stream::ExactlyOnce};
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
435 /// process
436 /// .source_iter(q!(vec![
437 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
438 /// std::collections::HashSet::from_iter(vec![3, 4]),
439 /// ]))
440 /// .flatten_unordered()
441 /// # }, |mut stream| async move {
442 /// // 1, 2, 3, 4, but in no particular order
443 /// # let mut results = Vec::new();
444 /// # for w in (1..5) {
445 /// # results.push(stream.next().await.unwrap());
446 /// # }
447 /// # results.sort();
448 /// # assert_eq!(results, vec![1, 2, 3, 4]);
449 /// # }));
450 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
451 where
452 T: IntoIterator<Item = U>,
453 {
454 self.flat_map_unordered(q!(|d| d))
455 }
456
457 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
458 /// `f`, preserving the order of the elements.
459 ///
460 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
461 /// not modify or take ownership of the values. If you need to modify the values while filtering
462 /// use [`Stream::filter_map`] instead.
463 ///
464 /// # Example
465 /// ```rust
466 /// # use hydro_lang::*;
467 /// # use futures::StreamExt;
468 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
469 /// process
470 /// .source_iter(q!(vec![1, 2, 3, 4]))
471 /// .filter(q!(|&x| x > 2))
472 /// # }, |mut stream| async move {
473 /// // 3, 4
474 /// # for w in (3..5) {
475 /// # assert_eq!(stream.next().await.unwrap(), w);
476 /// # }
477 /// # }));
478 /// ```
479 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
480 where
481 F: Fn(&T) -> bool + 'a,
482 {
483 let f = f.splice_fn1_borrow_ctx(&self.location).into();
484 Stream::new(
485 self.location.clone(),
486 HydroNode::Filter {
487 f,
488 input: Box::new(self.ir_node.into_inner()),
489 metadata: self.location.new_node_metadata::<T>(),
490 },
491 )
492 }
493
494 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
495 ///
496 /// # Example
497 /// ```rust
498 /// # use hydro_lang::*;
499 /// # use futures::StreamExt;
500 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
501 /// process
502 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
503 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
504 /// # }, |mut stream| async move {
505 /// // 1, 2
506 /// # for w in (1..3) {
507 /// # assert_eq!(stream.next().await.unwrap(), w);
508 /// # }
509 /// # }));
510 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
511 where
512 F: Fn(T) -> Option<U> + 'a,
513 {
514 let f = f.splice_fn1_ctx(&self.location).into();
515 Stream::new(
516 self.location.clone(),
517 HydroNode::FilterMap {
518 f,
519 input: Box::new(self.ir_node.into_inner()),
520 metadata: self.location.new_node_metadata::<U>(),
521 },
522 )
523 }
524
525 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
526 /// where `x` is the final value of `other`, a bounded [`Singleton`].
527 ///
528 /// # Example
529 /// ```rust
530 /// # use hydro_lang::*;
531 /// # use futures::StreamExt;
532 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
533 /// let tick = process.tick();
534 /// let batch = process
535 /// .source_iter(q!(vec![1, 2, 3, 4]))
536 /// .batch(&tick, nondet!(/** test */));
537 /// let count = batch.clone().count(); // `count()` returns a singleton
538 /// batch.cross_singleton(count).all_ticks()
539 /// # }, |mut stream| async move {
540 /// // (1, 4), (2, 4), (3, 4), (4, 4)
541 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
542 /// # assert_eq!(stream.next().await.unwrap(), w);
543 /// # }
544 /// # }));
545 pub fn cross_singleton<O2>(
546 self,
547 other: impl Into<Optional<O2, L, Bounded>>,
548 ) -> Stream<(T, O2), L, B, O, R>
549 where
550 O2: Clone,
551 {
552 let other: Optional<O2, L, Bounded> = other.into();
553 check_matching_location(&self.location, &other.location);
554
555 Stream::new(
556 self.location.clone(),
557 HydroNode::CrossSingleton {
558 left: Box::new(self.ir_node.into_inner()),
559 right: Box::new(other.ir_node.into_inner()),
560 metadata: self.location.new_node_metadata::<(T, O2)>(),
561 },
562 )
563 }
564
565 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
566 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
567 self.cross_singleton(signal.map(q!(|_u| ())))
568 .map(q!(|(d, _signal)| d))
569 }
570
571 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
572 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
573 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
574 }
575
576 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
577 /// tupled pairs in a non-deterministic order.
578 ///
579 /// # Example
580 /// ```rust
581 /// # use hydro_lang::*;
582 /// # use std::collections::HashSet;
583 /// # use futures::StreamExt;
584 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
585 /// let tick = process.tick();
586 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
587 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
588 /// stream1.cross_product(stream2)
589 /// # }, |mut stream| async move {
590 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
591 /// # stream.map(|i| assert!(expected.contains(&i)));
592 /// # }));
593 pub fn cross_product<T2, O2>(
594 self,
595 other: Stream<T2, L, B, O2, R>,
596 ) -> Stream<(T, T2), L, B, NoOrder, R>
597 where
598 T: Clone,
599 T2: Clone,
600 {
601 check_matching_location(&self.location, &other.location);
602
603 Stream::new(
604 self.location.clone(),
605 HydroNode::CrossProduct {
606 left: Box::new(self.ir_node.into_inner()),
607 right: Box::new(other.ir_node.into_inner()),
608 metadata: self.location.new_node_metadata::<(T, T2)>(),
609 },
610 )
611 }
612
613 /// Takes one stream as input and filters out any duplicate occurrences. The output
614 /// contains all unique values from the input.
615 ///
616 /// # Example
617 /// ```rust
618 /// # use hydro_lang::*;
619 /// # use futures::StreamExt;
620 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
621 /// let tick = process.tick();
622 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
623 /// # }, |mut stream| async move {
624 /// # for w in vec![1, 2, 3, 4] {
625 /// # assert_eq!(stream.next().await.unwrap(), w);
626 /// # }
627 /// # }));
628 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
629 where
630 T: Eq + Hash,
631 {
632 Stream::new(
633 self.location.clone(),
634 HydroNode::Unique {
635 input: Box::new(self.ir_node.into_inner()),
636 metadata: self.location.new_node_metadata::<T>(),
637 },
638 )
639 }
640
641 /// Outputs everything in this stream that is *not* contained in the `other` stream.
642 ///
643 /// The `other` stream must be [`Bounded`], since this function will wait until
644 /// all its elements are available before producing any output.
645 /// # Example
646 /// ```rust
647 /// # use hydro_lang::*;
648 /// # use futures::StreamExt;
649 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
650 /// let tick = process.tick();
651 /// let stream = process
652 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
653 /// .batch(&tick, nondet!(/** test */));
654 /// let batch = process
655 /// .source_iter(q!(vec![1, 2]))
656 /// .batch(&tick, nondet!(/** test */));
657 /// stream.filter_not_in(batch).all_ticks()
658 /// # }, |mut stream| async move {
659 /// # for w in vec![3, 4] {
660 /// # assert_eq!(stream.next().await.unwrap(), w);
661 /// # }
662 /// # }));
663 pub fn filter_not_in<O2>(
664 self,
665 other: Stream<T, L, Bounded, O2, R>,
666 ) -> Stream<T, L, Bounded, O, R>
667 where
668 T: Eq + Hash,
669 {
670 check_matching_location(&self.location, &other.location);
671
672 Stream::new(
673 self.location.clone(),
674 HydroNode::Difference {
675 pos: Box::new(self.ir_node.into_inner()),
676 neg: Box::new(other.ir_node.into_inner()),
677 metadata: self.location.new_node_metadata::<T>(),
678 },
679 )
680 }
681
682 /// An operator which allows you to "inspect" each element of a stream without
683 /// modifying it. The closure `f` is called on a reference to each item. This is
684 /// mainly useful for debugging, and should not be used to generate side-effects.
685 ///
686 /// # Example
687 /// ```rust
688 /// # use hydro_lang::*;
689 /// # use futures::StreamExt;
690 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
691 /// let nums = process.source_iter(q!(vec![1, 2]));
692 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
693 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
694 /// # }, |mut stream| async move {
695 /// # for w in vec![1, 2] {
696 /// # assert_eq!(stream.next().await.unwrap(), w);
697 /// # }
698 /// # }));
699 /// ```
700 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
701 where
702 F: Fn(&T) + 'a,
703 {
704 let f = f.splice_fn1_borrow_ctx(&self.location).into();
705
706 if L::is_top_level() {
707 Stream::new(
708 self.location.clone(),
709 HydroNode::Persist {
710 inner: Box::new(HydroNode::Inspect {
711 f,
712 input: Box::new(HydroNode::Unpersist {
713 inner: Box::new(self.ir_node.into_inner()),
714 metadata: self.location.new_node_metadata::<T>(),
715 }),
716 metadata: self.location.new_node_metadata::<T>(),
717 }),
718 metadata: self.location.new_node_metadata::<T>(),
719 },
720 )
721 } else {
722 Stream::new(
723 self.location.clone(),
724 HydroNode::Inspect {
725 f,
726 input: Box::new(self.ir_node.into_inner()),
727 metadata: self.location.new_node_metadata::<T>(),
728 },
729 )
730 }
731 }
732
733 /// An operator which allows you to "name" a `HydroNode`.
734 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
735 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
736 {
737 let mut node = self.ir_node.borrow_mut();
738 let metadata = node.metadata_mut();
739 metadata.tag = Some(name.to_string());
740 }
741 self
742 }
743
744 /// Explicitly "casts" the stream to a type with a different ordering
745 /// guarantee. Useful in unsafe code where the ordering cannot be proven
746 /// by the type-system.
747 ///
748 /// # Non-Determinism
749 /// This function is used as an escape hatch, and any mistakes in the
750 /// provided ordering guarantee will propagate into the guarantees
751 /// for the rest of the program.
752 pub fn assume_ordering<O2>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
753 Stream::new(self.location, self.ir_node.into_inner())
754 }
755
756 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
757 /// which is always safe because that is the weakest possible guarantee.
758 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
759 let nondet = nondet!(/** this is a weaker odering guarantee, so it is safe to assume */);
760 self.assume_ordering::<NoOrder>(nondet)
761 }
762
763 /// Explicitly "casts" the stream to a type with a different retries
764 /// guarantee. Useful in unsafe code where the lack of retries cannot
765 /// be proven by the type-system.
766 ///
767 /// # Non-Determinism
768 /// This function is used as an escape hatch, and any mistakes in the
769 /// provided retries guarantee will propagate into the guarantees
770 /// for the rest of the program.
771 pub fn assume_retries<R2>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
772 Stream::new(self.location, self.ir_node.into_inner())
773 }
774
775 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
776 /// which is always safe because that is the weakest possible guarantee.
777 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
778 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
779 self.assume_retries::<AtLeastOnce>(nondet)
780 }
781
782 /// Weakens the retries guarantee provided by the stream to be the weaker of the
783 /// current guarantee and `R2`. This is safe because the output guarantee will
784 /// always be weaker than the input.
785 pub fn weaken_retries<R2>(self) -> Stream<T, L, B, O, <R as MinRetries<R2>>::Min>
786 where
787 R: MinRetries<R2>,
788 {
789 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
790 self.assume_retries::<<R as MinRetries<R2>>::Min>(nondet)
791 }
792}
793
794impl<'a, T, L, B: Boundedness, O> Stream<T, L, B, O, ExactlyOnce>
795where
796 L: Location<'a>,
797{
798 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
799 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
800 pub fn weaker_retries<R2>(self) -> Stream<T, L, B, O, R2> {
801 self.assume_retries(
802 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
803 )
804 }
805}
806
807impl<'a, T, L, B: Boundedness, O, R> Stream<&T, L, B, O, R>
808where
809 L: Location<'a>,
810{
811 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
812 ///
813 /// # Example
814 /// ```rust
815 /// # use hydro_lang::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
818 /// process.source_iter(q!(&[1, 2, 3])).cloned()
819 /// # }, |mut stream| async move {
820 /// // 1, 2, 3
821 /// # for w in vec![1, 2, 3] {
822 /// # assert_eq!(stream.next().await.unwrap(), w);
823 /// # }
824 /// # }));
825 /// ```
826 pub fn cloned(self) -> Stream<T, L, B, O, R>
827 where
828 T: Clone,
829 {
830 self.map(q!(|d| d.clone()))
831 }
832}
833
834impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
835where
836 L: Location<'a>,
837{
838 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
839 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
840 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
841 ///
842 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
843 /// and there may be duplicates.
844 ///
845 /// # Example
846 /// ```rust
847 /// # use hydro_lang::*;
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
850 /// let tick = process.tick();
851 /// let bools = process.source_iter(q!(vec![false, true, false]));
852 /// let batch = bools.batch(&tick, nondet!(/** test */));
853 /// batch
854 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
855 /// .all_ticks()
856 /// # }, |mut stream| async move {
857 /// // true
858 /// # assert_eq!(stream.next().await.unwrap(), true);
859 /// # }));
860 /// ```
861 pub fn fold_commutative_idempotent<A, I, F>(
862 self,
863 init: impl IntoQuotedMut<'a, I, L>,
864 comb: impl IntoQuotedMut<'a, F, L>,
865 ) -> Singleton<A, L, B>
866 where
867 I: Fn() -> A + 'a,
868 F: Fn(&mut A, T),
869 {
870 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
871 self.assume_ordering(nondet)
872 .assume_retries(nondet)
873 .fold(init, comb)
874 }
875
876 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
877 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
878 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
879 /// reference, so that it can be modified in place.
880 ///
881 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
882 /// and there may be duplicates.
883 ///
884 /// # Example
885 /// ```rust
886 /// # use hydro_lang::*;
887 /// # use futures::StreamExt;
888 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
889 /// let tick = process.tick();
890 /// let bools = process.source_iter(q!(vec![false, true, false]));
891 /// let batch = bools.batch(&tick, nondet!(/** test */));
892 /// batch
893 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
894 /// .all_ticks()
895 /// # }, |mut stream| async move {
896 /// // true
897 /// # assert_eq!(stream.next().await.unwrap(), true);
898 /// # }));
899 /// ```
900 pub fn reduce_commutative_idempotent<F>(
901 self,
902 comb: impl IntoQuotedMut<'a, F, L>,
903 ) -> Optional<T, L, B>
904 where
905 F: Fn(&mut T, T) + 'a,
906 {
907 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
908 self.assume_ordering(nondet)
909 .assume_retries(nondet)
910 .reduce(comb)
911 }
912
913 /// Computes the maximum element in the stream as an [`Optional`], which
914 /// will be empty until the first element in the input arrives.
915 ///
916 /// # Example
917 /// ```rust
918 /// # use hydro_lang::*;
919 /// # use futures::StreamExt;
920 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
921 /// let tick = process.tick();
922 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
923 /// let batch = numbers.batch(&tick, nondet!(/** test */));
924 /// batch.max().all_ticks()
925 /// # }, |mut stream| async move {
926 /// // 4
927 /// # assert_eq!(stream.next().await.unwrap(), 4);
928 /// # }));
929 /// ```
930 pub fn max(self) -> Optional<T, L, B>
931 where
932 T: Ord,
933 {
934 self.reduce_commutative_idempotent(q!(|curr, new| {
935 if new > *curr {
936 *curr = new;
937 }
938 }))
939 }
940
941 /// Computes the maximum element in the stream as an [`Optional`], where the
942 /// maximum is determined according to the `key` function. The [`Optional`] will
943 /// be empty until the first element in the input arrives.
944 ///
945 /// # Example
946 /// ```rust
947 /// # use hydro_lang::*;
948 /// # use futures::StreamExt;
949 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
950 /// let tick = process.tick();
951 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
952 /// let batch = numbers.batch(&tick, nondet!(/** test */));
953 /// batch.max_by_key(q!(|x| -x)).all_ticks()
954 /// # }, |mut stream| async move {
955 /// // 1
956 /// # assert_eq!(stream.next().await.unwrap(), 1);
957 /// # }));
958 /// ```
959 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
960 where
961 K: Ord,
962 F: Fn(&T) -> K + 'a,
963 {
964 let f = key.splice_fn1_borrow_ctx(&self.location);
965
966 let wrapped: syn::Expr = parse_quote!({
967 let key_fn = #f;
968 move |curr, new| {
969 if key_fn(&new) > key_fn(&*curr) {
970 *curr = new;
971 }
972 }
973 });
974
975 let mut core = HydroNode::Reduce {
976 f: wrapped.into(),
977 input: Box::new(self.ir_node.into_inner()),
978 metadata: self.location.new_node_metadata::<T>(),
979 };
980
981 if L::is_top_level() {
982 core = HydroNode::Persist {
983 inner: Box::new(core),
984 metadata: self.location.new_node_metadata::<T>(),
985 };
986 }
987
988 Optional::new(self.location, core)
989 }
990
991 /// Computes the minimum element in the stream as an [`Optional`], which
992 /// will be empty until the first element in the input arrives.
993 ///
994 /// # Example
995 /// ```rust
996 /// # use hydro_lang::*;
997 /// # use futures::StreamExt;
998 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
999 /// let tick = process.tick();
1000 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1001 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1002 /// batch.min().all_ticks()
1003 /// # }, |mut stream| async move {
1004 /// // 1
1005 /// # assert_eq!(stream.next().await.unwrap(), 1);
1006 /// # }));
1007 /// ```
1008 pub fn min(self) -> Optional<T, L, B>
1009 where
1010 T: Ord,
1011 {
1012 self.reduce_commutative_idempotent(q!(|curr, new| {
1013 if new < *curr {
1014 *curr = new;
1015 }
1016 }))
1017 }
1018}
1019
1020impl<'a, T, L, B: Boundedness, O> Stream<T, L, B, O, ExactlyOnce>
1021where
1022 L: Location<'a>,
1023{
1024 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1025 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1026 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1027 ///
1028 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1029 ///
1030 /// # Example
1031 /// ```rust
1032 /// # use hydro_lang::*;
1033 /// # use futures::StreamExt;
1034 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1035 /// let tick = process.tick();
1036 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1037 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1038 /// batch
1039 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1040 /// .all_ticks()
1041 /// # }, |mut stream| async move {
1042 /// // 10
1043 /// # assert_eq!(stream.next().await.unwrap(), 10);
1044 /// # }));
1045 /// ```
1046 pub fn fold_commutative<A, I, F>(
1047 self,
1048 init: impl IntoQuotedMut<'a, I, L>,
1049 comb: impl IntoQuotedMut<'a, F, L>,
1050 ) -> Singleton<A, L, B>
1051 where
1052 I: Fn() -> A + 'a,
1053 F: Fn(&mut A, T),
1054 {
1055 let nondet = nondet!(/** the combinator function is commutative */);
1056 self.assume_ordering(nondet).fold(init, comb)
1057 }
1058
1059 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1060 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1061 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1062 /// reference, so that it can be modified in place.
1063 ///
1064 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1065 ///
1066 /// # Example
1067 /// ```rust
1068 /// # use hydro_lang::*;
1069 /// # use futures::StreamExt;
1070 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1071 /// let tick = process.tick();
1072 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1073 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1074 /// batch
1075 /// .reduce_commutative(q!(|curr, new| *curr += new))
1076 /// .all_ticks()
1077 /// # }, |mut stream| async move {
1078 /// // 10
1079 /// # assert_eq!(stream.next().await.unwrap(), 10);
1080 /// # }));
1081 /// ```
1082 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1083 where
1084 F: Fn(&mut T, T) + 'a,
1085 {
1086 let nondet = nondet!(/** the combinator function is commutative */);
1087 self.assume_ordering(nondet).reduce(comb)
1088 }
1089
1090 /// Computes the number of elements in the stream as a [`Singleton`].
1091 ///
1092 /// # Example
1093 /// ```rust
1094 /// # use hydro_lang::*;
1095 /// # use futures::StreamExt;
1096 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1097 /// let tick = process.tick();
1098 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1099 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1100 /// batch.count().all_ticks()
1101 /// # }, |mut stream| async move {
1102 /// // 4
1103 /// # assert_eq!(stream.next().await.unwrap(), 4);
1104 /// # }));
1105 /// ```
1106 pub fn count(self) -> Singleton<usize, L, B> {
1107 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1108 }
1109}
1110
1111impl<'a, T, L, B: Boundedness, R> Stream<T, L, B, TotalOrder, R>
1112where
1113 L: Location<'a>,
1114{
1115 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1116 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1117 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1118 ///
1119 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1120 ///
1121 /// # Example
1122 /// ```rust
1123 /// # use hydro_lang::*;
1124 /// # use futures::StreamExt;
1125 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1126 /// let tick = process.tick();
1127 /// let bools = process.source_iter(q!(vec![false, true, false]));
1128 /// let batch = bools.batch(&tick, nondet!(/** test */));
1129 /// batch
1130 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1131 /// .all_ticks()
1132 /// # }, |mut stream| async move {
1133 /// // true
1134 /// # assert_eq!(stream.next().await.unwrap(), true);
1135 /// # }));
1136 /// ```
1137 pub fn fold_idempotent<A, I, F>(
1138 self,
1139 init: impl IntoQuotedMut<'a, I, L>,
1140 comb: impl IntoQuotedMut<'a, F, L>,
1141 ) -> Singleton<A, L, B>
1142 where
1143 I: Fn() -> A + 'a,
1144 F: Fn(&mut A, T),
1145 {
1146 let nondet = nondet!(/** the combinator function is idempotent */);
1147 self.assume_retries(nondet).fold(init, comb)
1148 }
1149
1150 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1151 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1152 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1153 /// reference, so that it can be modified in place.
1154 ///
1155 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1156 ///
1157 /// # Example
1158 /// ```rust
1159 /// # use hydro_lang::*;
1160 /// # use futures::StreamExt;
1161 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1162 /// let tick = process.tick();
1163 /// let bools = process.source_iter(q!(vec![false, true, false]));
1164 /// let batch = bools.batch(&tick, nondet!(/** test */));
1165 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1166 /// # }, |mut stream| async move {
1167 /// // true
1168 /// # assert_eq!(stream.next().await.unwrap(), true);
1169 /// # }));
1170 /// ```
1171 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1172 where
1173 F: Fn(&mut T, T) + 'a,
1174 {
1175 let nondet = nondet!(/** the combinator function is idempotent */);
1176 self.assume_retries(nondet).reduce(comb)
1177 }
1178
1179 /// Computes the first element in the stream as an [`Optional`], which
1180 /// will be empty until the first element in the input arrives.
1181 ///
1182 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1183 /// re-ordering of elements may cause the first element to change.
1184 ///
1185 /// # Example
1186 /// ```rust
1187 /// # use hydro_lang::*;
1188 /// # use futures::StreamExt;
1189 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1190 /// let tick = process.tick();
1191 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1192 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1193 /// batch.first().all_ticks()
1194 /// # }, |mut stream| async move {
1195 /// // 1
1196 /// # assert_eq!(stream.next().await.unwrap(), 1);
1197 /// # }));
1198 /// ```
1199 pub fn first(self) -> Optional<T, L, B> {
1200 self.reduce_idempotent(q!(|_, _| {}))
1201 }
1202
1203 /// Computes the last element in the stream as an [`Optional`], which
1204 /// will be empty until an element in the input arrives.
1205 ///
1206 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1207 /// re-ordering of elements may cause the last element to change.
1208 ///
1209 /// # Example
1210 /// ```rust
1211 /// # use hydro_lang::*;
1212 /// # use futures::StreamExt;
1213 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1214 /// let tick = process.tick();
1215 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1216 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1217 /// batch.last().all_ticks()
1218 /// # }, |mut stream| async move {
1219 /// // 4
1220 /// # assert_eq!(stream.next().await.unwrap(), 4);
1221 /// # }));
1222 /// ```
1223 pub fn last(self) -> Optional<T, L, B> {
1224 self.reduce_idempotent(q!(|curr, new| *curr = new))
1225 }
1226}
1227
1228impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1229where
1230 L: Location<'a>,
1231{
1232 /// Returns a stream with the current count tupled with each element in the input stream.
1233 ///
1234 /// # Example
1235 /// ```rust
1236 /// # use hydro_lang::{*, stream::ExactlyOnce};
1237 /// # use futures::StreamExt;
1238 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1239 /// let tick = process.tick();
1240 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1241 /// numbers.enumerate()
1242 /// # }, |mut stream| async move {
1243 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1244 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1245 /// # assert_eq!(stream.next().await.unwrap(), w);
1246 /// # }
1247 /// # }));
1248 /// ```
1249 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1250 if L::is_top_level() {
1251 Stream::new(
1252 self.location.clone(),
1253 HydroNode::Persist {
1254 inner: Box::new(HydroNode::Enumerate {
1255 is_static: true,
1256 input: Box::new(HydroNode::Unpersist {
1257 inner: Box::new(self.ir_node.into_inner()),
1258 metadata: self.location.new_node_metadata::<T>(),
1259 }),
1260 metadata: self.location.new_node_metadata::<(usize, T)>(),
1261 }),
1262 metadata: self.location.new_node_metadata::<(usize, T)>(),
1263 },
1264 )
1265 } else {
1266 Stream::new(
1267 self.location.clone(),
1268 HydroNode::Enumerate {
1269 is_static: false,
1270 input: Box::new(self.ir_node.into_inner()),
1271 metadata: self.location.new_node_metadata::<(usize, T)>(),
1272 },
1273 )
1274 }
1275 }
1276
1277 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1278 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1279 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1280 ///
1281 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1282 /// to depend on the order of elements in the stream.
1283 ///
1284 /// # Example
1285 /// ```rust
1286 /// # use hydro_lang::*;
1287 /// # use futures::StreamExt;
1288 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1289 /// let tick = process.tick();
1290 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1291 /// let batch = words.batch(&tick, nondet!(/** test */));
1292 /// batch
1293 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1294 /// .all_ticks()
1295 /// # }, |mut stream| async move {
1296 /// // "HELLOWORLD"
1297 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1298 /// # }));
1299 /// ```
1300 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1301 self,
1302 init: impl IntoQuotedMut<'a, I, L>,
1303 comb: impl IntoQuotedMut<'a, F, L>,
1304 ) -> Singleton<A, L, B> {
1305 let init = init.splice_fn0_ctx(&self.location).into();
1306 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1307
1308 let mut core = HydroNode::Fold {
1309 init,
1310 acc: comb,
1311 input: Box::new(self.ir_node.into_inner()),
1312 metadata: self.location.new_node_metadata::<A>(),
1313 };
1314
1315 if L::is_top_level() {
1316 // top-level (possibly unbounded) singletons are represented as
1317 // a stream which produces all values from all ticks every tick,
1318 // so Unpersist will always give the lastest aggregation
1319 core = HydroNode::Persist {
1320 inner: Box::new(core),
1321 metadata: self.location.new_node_metadata::<A>(),
1322 };
1323 }
1324
1325 Singleton::new(self.location, core)
1326 }
1327
1328 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1329 self.fold(
1330 q!(|| vec![]),
1331 q!(|acc, v| {
1332 acc.push(v);
1333 }),
1334 )
1335 }
1336
1337 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1338 /// and emitting each intermediate result.
1339 ///
1340 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1341 /// containing all intermediate accumulated values. The scan operation can also terminate early
1342 /// by returning `None`.
1343 ///
1344 /// The function takes a mutable reference to the accumulator and the current element, and returns
1345 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1346 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1347 ///
1348 /// # Examples
1349 ///
1350 /// Basic usage - running sum:
1351 /// ```rust
1352 /// # use hydro_lang::*;
1353 /// # use futures::StreamExt;
1354 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1355 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1356 /// q!(|| 0),
1357 /// q!(|acc, x| {
1358 /// *acc += x;
1359 /// Some(*acc)
1360 /// }),
1361 /// )
1362 /// # }, |mut stream| async move {
1363 /// // Output: 1, 3, 6, 10
1364 /// # for w in vec![1, 3, 6, 10] {
1365 /// # assert_eq!(stream.next().await.unwrap(), w);
1366 /// # }
1367 /// # }));
1368 /// ```
1369 ///
1370 /// Early termination example:
1371 /// ```rust
1372 /// # use hydro_lang::*;
1373 /// # use futures::StreamExt;
1374 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1375 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1376 /// q!(|| 1),
1377 /// q!(|state, x| {
1378 /// *state = *state * x;
1379 /// if *state > 6 {
1380 /// None // Terminate the stream
1381 /// } else {
1382 /// Some(-*state)
1383 /// }
1384 /// }),
1385 /// )
1386 /// # }, |mut stream| async move {
1387 /// // Output: -1, -2, -6
1388 /// # for w in vec![-1, -2, -6] {
1389 /// # assert_eq!(stream.next().await.unwrap(), w);
1390 /// # }
1391 /// # }));
1392 /// ```
1393 pub fn scan<A, U, I, F>(
1394 self,
1395 init: impl IntoQuotedMut<'a, I, L>,
1396 f: impl IntoQuotedMut<'a, F, L>,
1397 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1398 where
1399 I: Fn() -> A + 'a,
1400 F: Fn(&mut A, T) -> Option<U> + 'a,
1401 {
1402 let init = init.splice_fn0_ctx(&self.location).into();
1403 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1404
1405 if L::is_top_level() {
1406 Stream::new(
1407 self.location.clone(),
1408 HydroNode::Persist {
1409 inner: Box::new(HydroNode::Scan {
1410 init,
1411 acc: f,
1412 input: Box::new(HydroNode::Unpersist {
1413 inner: Box::new(self.ir_node.into_inner()),
1414 metadata: self.location.new_node_metadata::<U>(),
1415 }),
1416 metadata: self.location.new_node_metadata::<U>(),
1417 }),
1418 metadata: self.location.new_node_metadata::<U>(),
1419 },
1420 )
1421 } else {
1422 Stream::new(
1423 self.location.clone(),
1424 HydroNode::Scan {
1425 init,
1426 acc: f,
1427 input: Box::new(self.ir_node.into_inner()),
1428 metadata: self.location.new_node_metadata::<U>(),
1429 },
1430 )
1431 }
1432 }
1433
1434 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1435 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1436 /// until the first element in the input arrives.
1437 ///
1438 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1439 /// to depend on the order of elements in the stream.
1440 ///
1441 /// # Example
1442 /// ```rust
1443 /// # use hydro_lang::*;
1444 /// # use futures::StreamExt;
1445 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1446 /// let tick = process.tick();
1447 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1448 /// let batch = words.batch(&tick, nondet!(/** test */));
1449 /// batch
1450 /// .map(q!(|x| x.to_string()))
1451 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1452 /// .all_ticks()
1453 /// # }, |mut stream| async move {
1454 /// // "HELLOWORLD"
1455 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1456 /// # }));
1457 /// ```
1458 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1459 self,
1460 comb: impl IntoQuotedMut<'a, F, L>,
1461 ) -> Optional<T, L, B> {
1462 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1463 let mut core = HydroNode::Reduce {
1464 f,
1465 input: Box::new(self.ir_node.into_inner()),
1466 metadata: self.location.new_node_metadata::<T>(),
1467 };
1468
1469 if L::is_top_level() {
1470 core = HydroNode::Persist {
1471 inner: Box::new(core),
1472 metadata: self.location.new_node_metadata::<T>(),
1473 };
1474 }
1475
1476 Optional::new(self.location, core)
1477 }
1478}
1479
1480impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1481 /// Produces a new stream that interleaves the elements of the two input streams.
1482 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1483 ///
1484 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1485 /// [`Bounded`], you can use [`Stream::chain`] instead.
1486 ///
1487 /// # Example
1488 /// ```rust
1489 /// # use hydro_lang::*;
1490 /// # use futures::StreamExt;
1491 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1492 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1493 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1494 /// # }, |mut stream| async move {
1495 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1496 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1497 /// # assert_eq!(stream.next().await.unwrap(), w);
1498 /// # }
1499 /// # }));
1500 /// ```
1501 pub fn interleave<O2, R2: MinRetries<R>>(
1502 self,
1503 other: Stream<T, L, Unbounded, O2, R2>,
1504 ) -> Stream<T, L, Unbounded, NoOrder, R::Min>
1505 where
1506 R: MinRetries<R2, Min = R2::Min>,
1507 {
1508 let tick = self.location.tick();
1509 // Because the outputs are unordered, we can interleave batches from both streams.
1510 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1511 self.batch(&tick, nondet_batch_interleaving)
1512 .weakest_ordering()
1513 .weaken_retries::<R2>()
1514 .chain(
1515 other
1516 .batch(&tick, nondet_batch_interleaving)
1517 .weakest_ordering()
1518 .weaken_retries::<R>(),
1519 )
1520 .all_ticks()
1521 }
1522}
1523
1524impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1525where
1526 L: Location<'a>,
1527{
1528 /// Produces a new stream that emits the input elements in sorted order.
1529 ///
1530 /// The input stream can have any ordering guarantee, but the output stream
1531 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1532 /// elements in the input stream are available, so it requires the input stream
1533 /// to be [`Bounded`].
1534 ///
1535 /// # Example
1536 /// ```rust
1537 /// # use hydro_lang::*;
1538 /// # use futures::StreamExt;
1539 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1540 /// let tick = process.tick();
1541 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1542 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1543 /// batch.sort().all_ticks()
1544 /// # }, |mut stream| async move {
1545 /// // 1, 2, 3, 4
1546 /// # for w in (1..5) {
1547 /// # assert_eq!(stream.next().await.unwrap(), w);
1548 /// # }
1549 /// # }));
1550 /// ```
1551 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1552 where
1553 T: Ord,
1554 {
1555 Stream::new(
1556 self.location.clone(),
1557 HydroNode::Sort {
1558 input: Box::new(self.ir_node.into_inner()),
1559 metadata: self.location.new_node_metadata::<T>(),
1560 },
1561 )
1562 }
1563
1564 /// Produces a new stream that first emits the elements of the `self` stream,
1565 /// and then emits the elements of the `other` stream. The output stream has
1566 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1567 /// [`TotalOrder`] guarantee.
1568 ///
1569 /// Currently, both input streams must be [`Bounded`]. This operator will block
1570 /// on the first stream until all its elements are available. In a future version,
1571 /// we will relax the requirement on the `other` stream.
1572 ///
1573 /// # Example
1574 /// ```rust
1575 /// # use hydro_lang::*;
1576 /// # use futures::StreamExt;
1577 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1578 /// let tick = process.tick();
1579 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1580 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1581 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1582 /// # }, |mut stream| async move {
1583 /// // 2, 3, 4, 5, 1, 2, 3, 4
1584 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1585 /// # assert_eq!(stream.next().await.unwrap(), w);
1586 /// # }
1587 /// # }));
1588 /// ```
1589 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1590 where
1591 O: MinOrder<O2>,
1592 {
1593 check_matching_location(&self.location, &other.location);
1594
1595 Stream::new(
1596 self.location.clone(),
1597 HydroNode::Chain {
1598 first: Box::new(self.ir_node.into_inner()),
1599 second: Box::new(other.ir_node.into_inner()),
1600 metadata: self.location.new_node_metadata::<T>(),
1601 },
1602 )
1603 }
1604
1605 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1606 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1607 /// because this is compiled into a nested loop.
1608 pub fn cross_product_nested_loop<T2, O2>(
1609 self,
1610 other: Stream<T2, L, Bounded, O2, R>,
1611 ) -> Stream<(T, T2), L, Bounded, O::Min, R>
1612 where
1613 T: Clone,
1614 T2: Clone,
1615 O: MinOrder<O2>,
1616 {
1617 check_matching_location(&self.location, &other.location);
1618
1619 Stream::new(
1620 self.location.clone(),
1621 HydroNode::CrossProduct {
1622 left: Box::new(self.ir_node.into_inner()),
1623 right: Box::new(other.ir_node.into_inner()),
1624 metadata: self.location.new_node_metadata::<(T, T2)>(),
1625 },
1626 )
1627 }
1628}
1629
1630impl<'a, K, V1, L, B: Boundedness, O, R> Stream<(K, V1), L, B, O, R>
1631where
1632 L: Location<'a>,
1633{
1634 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1635 /// by equi-joining the two streams on the key attribute `K`.
1636 ///
1637 /// # Example
1638 /// ```rust
1639 /// # use hydro_lang::*;
1640 /// # use std::collections::HashSet;
1641 /// # use futures::StreamExt;
1642 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1643 /// let tick = process.tick();
1644 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1645 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1646 /// stream1.join(stream2)
1647 /// # }, |mut stream| async move {
1648 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1649 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1650 /// # stream.map(|i| assert!(expected.contains(&i)));
1651 /// # }));
1652 pub fn join<V2, O2>(
1653 self,
1654 n: Stream<(K, V2), L, B, O2, R>,
1655 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1656 where
1657 K: Eq + Hash,
1658 {
1659 check_matching_location(&self.location, &n.location);
1660
1661 Stream::new(
1662 self.location.clone(),
1663 HydroNode::Join {
1664 left: Box::new(self.ir_node.into_inner()),
1665 right: Box::new(n.ir_node.into_inner()),
1666 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1667 },
1668 )
1669 }
1670
1671 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1672 /// computes the anti-join of the items in the input -- i.e. returns
1673 /// unique items in the first input that do not have a matching key
1674 /// in the second input.
1675 ///
1676 /// # Example
1677 /// ```rust
1678 /// # use hydro_lang::*;
1679 /// # use futures::StreamExt;
1680 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1681 /// let tick = process.tick();
1682 /// let stream = process
1683 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1684 /// .batch(&tick, nondet!(/** test */));
1685 /// let batch = process
1686 /// .source_iter(q!(vec![1, 2]))
1687 /// .batch(&tick, nondet!(/** test */));
1688 /// stream.anti_join(batch).all_ticks()
1689 /// # }, |mut stream| async move {
1690 /// # for w in vec![(3, 'c'), (4, 'd')] {
1691 /// # assert_eq!(stream.next().await.unwrap(), w);
1692 /// # }
1693 /// # }));
1694 pub fn anti_join<O2, R2>(self, n: Stream<K, L, Bounded, O2, R2>) -> Stream<(K, V1), L, B, O, R>
1695 where
1696 K: Eq + Hash,
1697 {
1698 check_matching_location(&self.location, &n.location);
1699
1700 Stream::new(
1701 self.location.clone(),
1702 HydroNode::AntiJoin {
1703 pos: Box::new(self.ir_node.into_inner()),
1704 neg: Box::new(n.ir_node.into_inner()),
1705 metadata: self.location.new_node_metadata::<(K, V1)>(),
1706 },
1707 )
1708 }
1709}
1710
1711impl<'a, K, V, L: Location<'a>, B: Boundedness, O, R> Stream<(K, V), L, B, O, R> {
1712 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1713 KeyedStream {
1714 underlying: self.weakest_ordering(),
1715 _phantom_order: Default::default(),
1716 }
1717 }
1718}
1719
1720impl<'a, K, V, L, B: Boundedness> Stream<(K, V), L, B, TotalOrder, ExactlyOnce>
1721where
1722 K: Eq + Hash,
1723 L: Location<'a>,
1724{
1725 /// A special case of [`Stream::scan`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1726 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1727 /// in the second element are transformed via the `f` combinator.
1728 ///
1729 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1730 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1731 /// early by returning `None`.
1732 ///
1733 /// The function takes a mutable reference to the accumulator and the current element, and returns
1734 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1735 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1736 ///
1737 /// # Example
1738 /// ```rust
1739 /// # use hydro_lang::*;
1740 /// # use futures::StreamExt;
1741 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1742 /// process
1743 /// .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
1744 /// .scan_keyed(
1745 /// q!(|| 0),
1746 /// q!(|acc, x| {
1747 /// *acc += x;
1748 /// Some(*acc)
1749 /// }),
1750 /// )
1751 /// # }, |mut stream| async move {
1752 /// // Output: (0, 1), (0, 3), (1, 3), (1, 7)
1753 /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
1754 /// # assert_eq!(stream.next().await.unwrap(), w);
1755 /// # }
1756 /// # }));
1757 /// ```
1758 pub fn scan_keyed<A, U, I, F>(
1759 self,
1760 init: impl IntoQuotedMut<'a, I, L> + Copy,
1761 f: impl IntoQuotedMut<'a, F, L> + Copy,
1762 ) -> Stream<(K, U), L, B, TotalOrder, ExactlyOnce>
1763 where
1764 K: Clone,
1765 I: Fn() -> A + 'a,
1766 F: Fn(&mut A, V) -> Option<U> + 'a,
1767 {
1768 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1769 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1770 self.scan(
1771 q!(|| HashMap::new()),
1772 q!(move |acc, (k, v)| {
1773 let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1774 if let Some(out) = f(existing_state, v) {
1775 Some(Some((k, out)))
1776 } else {
1777 acc.remove(&k);
1778 Some(None)
1779 }
1780 }),
1781 )
1782 .flatten_ordered()
1783 }
1784
1785 /// Like [`Stream::fold_keyed`], in the spirit of SQL's GROUP BY and aggregation constructs. But the aggregation
1786 /// function returns a boolean, which when true indicates that the aggregated result is complete and can be
1787 /// released to downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input stream
1788 /// is [`Unbounded`], the outputs of the fold can be processed like normal stream elements.
1789 ///
1790 /// # Example
1791 /// ```rust
1792 /// # use hydro_lang::*;
1793 /// # use futures::StreamExt;
1794 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1795 /// process
1796 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1797 /// .fold_keyed_early_stop(
1798 /// q!(|| 0),
1799 /// q!(|acc, x| {
1800 /// *acc += x;
1801 /// x % 2 == 0
1802 /// }),
1803 /// )
1804 /// # }, |mut stream| async move {
1805 /// // Output: (0, 2), (1, 9)
1806 /// # for w in vec![(0, 2), (1, 9)] {
1807 /// # assert_eq!(stream.next().await.unwrap(), w);
1808 /// # }
1809 /// # }));
1810 /// ```
1811 pub fn fold_keyed_early_stop<A, I, F>(
1812 self,
1813 init: impl IntoQuotedMut<'a, I, L> + Copy,
1814 f: impl IntoQuotedMut<'a, F, L> + Copy,
1815 ) -> Stream<(K, A), L, B, TotalOrder, ExactlyOnce>
1816 where
1817 K: Clone,
1818 I: Fn() -> A + 'a,
1819 F: Fn(&mut A, V) -> bool + 'a,
1820 {
1821 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1822 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1823 self.scan(
1824 q!(|| HashMap::new()),
1825 q!(move |acc, (k, v)| {
1826 let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1827 if f(existing_state, v) {
1828 let out = acc.remove(&k).unwrap();
1829 Some(Some((k, out)))
1830 } else {
1831 Some(None)
1832 }
1833 }),
1834 )
1835 .flatten_ordered()
1836 }
1837}
1838
1839impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1840where
1841 K: Eq + Hash,
1842 L: Location<'a>,
1843{
1844 #[deprecated = "use .into_keyed().fold(...) instead"]
1845 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1846 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1847 /// in the second element are accumulated via the `comb` closure.
1848 ///
1849 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1850 /// to depend on the order of elements in the stream.
1851 ///
1852 /// If the input and output value types are the same and do not require initialization then use
1853 /// [`Stream::reduce_keyed`].
1854 ///
1855 /// # Example
1856 /// ```rust
1857 /// # use hydro_lang::*;
1858 /// # use futures::StreamExt;
1859 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1860 /// let tick = process.tick();
1861 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1862 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1863 /// batch
1864 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1865 /// .all_ticks()
1866 /// # }, |mut stream| async move {
1867 /// // (1, 5), (2, 7)
1868 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1869 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1870 /// # }));
1871 /// ```
1872 pub fn fold_keyed<A, I, F>(
1873 self,
1874 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1875 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1876 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1877 where
1878 I: Fn() -> A + 'a,
1879 F: Fn(&mut A, V) + 'a,
1880 {
1881 self.into_keyed().fold(init, comb).entries()
1882 }
1883
1884 #[deprecated = "use .into_keyed().reduce(...) instead"]
1885 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1886 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1887 /// in the second element are accumulated via the `comb` closure.
1888 ///
1889 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1890 /// to depend on the order of elements in the stream.
1891 ///
1892 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1893 ///
1894 /// # Example
1895 /// ```rust
1896 /// # use hydro_lang::*;
1897 /// # use futures::StreamExt;
1898 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1899 /// let tick = process.tick();
1900 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1901 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1902 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1903 /// # }, |mut stream| async move {
1904 /// // (1, 5), (2, 7)
1905 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1906 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1907 /// # }));
1908 /// ```
1909 pub fn reduce_keyed<F>(
1910 self,
1911 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1912 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1913 where
1914 F: Fn(&mut V, V) + 'a,
1915 {
1916 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1917
1918 Stream::new(
1919 self.location.clone(),
1920 HydroNode::ReduceKeyed {
1921 f,
1922 input: Box::new(self.ir_node.into_inner()),
1923 metadata: self.location.new_node_metadata::<(K, V)>(),
1924 },
1925 )
1926 }
1927}
1928
1929impl<'a, K, V, L, O, R> Stream<(K, V), Tick<L>, Bounded, O, R>
1930where
1931 K: Eq + Hash,
1932 L: Location<'a>,
1933{
1934 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1935 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1936 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1937 /// in the second element are accumulated via the `comb` closure.
1938 ///
1939 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1940 /// as there may be non-deterministic duplicates.
1941 ///
1942 /// If the input and output value types are the same and do not require initialization then use
1943 /// [`Stream::reduce_keyed_commutative_idempotent`].
1944 ///
1945 /// # Example
1946 /// ```rust
1947 /// # use hydro_lang::*;
1948 /// # use futures::StreamExt;
1949 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1950 /// let tick = process.tick();
1951 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1952 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1953 /// batch
1954 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1955 /// .all_ticks()
1956 /// # }, |mut stream| async move {
1957 /// // (1, false), (2, true)
1958 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1959 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1960 /// # }));
1961 /// ```
1962 pub fn fold_keyed_commutative_idempotent<A, I, F>(
1963 self,
1964 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1965 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1966 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1967 where
1968 I: Fn() -> A + 'a,
1969 F: Fn(&mut A, V) + 'a,
1970 {
1971 self.into_keyed()
1972 .fold_commutative_idempotent(init, comb)
1973 .entries()
1974 }
1975
1976 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1977 /// # Example
1978 /// ```rust
1979 /// # use hydro_lang::*;
1980 /// # use futures::StreamExt;
1981 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1982 /// let tick = process.tick();
1983 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1984 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1985 /// batch.keys().all_ticks()
1986 /// # }, |mut stream| async move {
1987 /// // 1, 2
1988 /// # assert_eq!(stream.next().await.unwrap(), 1);
1989 /// # assert_eq!(stream.next().await.unwrap(), 2);
1990 /// # }));
1991 /// ```
1992 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1993 self.into_keyed()
1994 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
1995 .keys()
1996 }
1997
1998 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
1999 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2000 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2001 /// in the second element are accumulated via the `comb` closure.
2002 ///
2003 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2004 /// as there may be non-deterministic duplicates.
2005 ///
2006 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2007 ///
2008 /// # Example
2009 /// ```rust
2010 /// # use hydro_lang::*;
2011 /// # use futures::StreamExt;
2012 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2013 /// let tick = process.tick();
2014 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2015 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2016 /// batch
2017 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2018 /// .all_ticks()
2019 /// # }, |mut stream| async move {
2020 /// // (1, false), (2, true)
2021 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2022 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2023 /// # }));
2024 /// ```
2025 pub fn reduce_keyed_commutative_idempotent<F>(
2026 self,
2027 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2028 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2029 where
2030 F: Fn(&mut V, V) + 'a,
2031 {
2032 self.into_keyed()
2033 .reduce_commutative_idempotent(comb)
2034 .entries()
2035 }
2036}
2037
2038impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2039where
2040 K: Eq + Hash,
2041 L: Location<'a>,
2042{
2043 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2044 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2045 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2046 /// in the second element are accumulated via the `comb` closure.
2047 ///
2048 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2049 ///
2050 /// If the input and output value types are the same and do not require initialization then use
2051 /// [`Stream::reduce_keyed_commutative`].
2052 ///
2053 /// # Example
2054 /// ```rust
2055 /// # use hydro_lang::*;
2056 /// # use futures::StreamExt;
2057 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2058 /// let tick = process.tick();
2059 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2060 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2061 /// batch
2062 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2063 /// .all_ticks()
2064 /// # }, |mut stream| async move {
2065 /// // (1, 5), (2, 7)
2066 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2067 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2068 /// # }));
2069 /// ```
2070 pub fn fold_keyed_commutative<A, I, F>(
2071 self,
2072 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2073 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2074 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2075 where
2076 I: Fn() -> A + 'a,
2077 F: Fn(&mut A, V) + 'a,
2078 {
2079 self.into_keyed().fold_commutative(init, comb).entries()
2080 }
2081
2082 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2083 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2084 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2085 /// in the second element are accumulated via the `comb` closure.
2086 ///
2087 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2088 ///
2089 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2090 ///
2091 /// # Example
2092 /// ```rust
2093 /// # use hydro_lang::*;
2094 /// # use futures::StreamExt;
2095 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2096 /// let tick = process.tick();
2097 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2098 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099 /// batch
2100 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2101 /// .all_ticks()
2102 /// # }, |mut stream| async move {
2103 /// // (1, 5), (2, 7)
2104 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2105 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2106 /// # }));
2107 /// ```
2108 pub fn reduce_keyed_commutative<F>(
2109 self,
2110 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2111 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2112 where
2113 F: Fn(&mut V, V) + 'a,
2114 {
2115 self.into_keyed().reduce_commutative(comb).entries()
2116 }
2117}
2118
2119impl<'a, K, V, L, R> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2120where
2121 K: Eq + Hash,
2122 L: Location<'a>,
2123{
2124 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2125 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2126 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2127 /// in the second element are accumulated via the `comb` closure.
2128 ///
2129 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2130 ///
2131 /// If the input and output value types are the same and do not require initialization then use
2132 /// [`Stream::reduce_keyed_idempotent`].
2133 ///
2134 /// # Example
2135 /// ```rust
2136 /// # use hydro_lang::*;
2137 /// # use futures::StreamExt;
2138 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2139 /// let tick = process.tick();
2140 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2141 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2142 /// batch
2143 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2144 /// .all_ticks()
2145 /// # }, |mut stream| async move {
2146 /// // (1, false), (2, true)
2147 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2148 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2149 /// # }));
2150 /// ```
2151 pub fn fold_keyed_idempotent<A, I, F>(
2152 self,
2153 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2154 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2155 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2156 where
2157 I: Fn() -> A + 'a,
2158 F: Fn(&mut A, V) + 'a,
2159 {
2160 self.into_keyed().fold_idempotent(init, comb).entries()
2161 }
2162
2163 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2164 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2165 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2166 /// in the second element are accumulated via the `comb` closure.
2167 ///
2168 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2169 ///
2170 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2171 ///
2172 /// # Example
2173 /// ```rust
2174 /// # use hydro_lang::*;
2175 /// # use futures::StreamExt;
2176 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2177 /// let tick = process.tick();
2178 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2179 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180 /// batch
2181 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2182 /// .all_ticks()
2183 /// # }, |mut stream| async move {
2184 /// // (1, false), (2, true)
2185 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2186 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2187 /// # }));
2188 /// ```
2189 pub fn reduce_keyed_idempotent<F>(
2190 self,
2191 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2192 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2193 where
2194 F: Fn(&mut V, V) + 'a,
2195 {
2196 self.into_keyed().reduce_idempotent(comb).entries()
2197 }
2198}
2199
2200impl<'a, T, L, B: Boundedness, O, R> Stream<T, Atomic<L>, B, O, R>
2201where
2202 L: Location<'a> + NoTick,
2203{
2204 /// Returns a stream corresponding to the latest batch of elements being atomically
2205 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2206 /// the order of the input.
2207 ///
2208 /// # Non-Determinism
2209 /// The batch boundaries are non-deterministic and may change across executions.
2210 pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2211 Stream::new(
2212 self.location.clone().tick,
2213 HydroNode::Unpersist {
2214 inner: Box::new(self.ir_node.into_inner()),
2215 metadata: self.location.new_node_metadata::<T>(),
2216 },
2217 )
2218 }
2219
2220 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2221 Stream::new(self.location.tick.l, self.ir_node.into_inner())
2222 }
2223
2224 pub fn atomic_source(&self) -> Tick<L> {
2225 self.location.tick.clone()
2226 }
2227}
2228
2229impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
2230where
2231 L: Location<'a> + NoTick + NoAtomic,
2232{
2233 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2234 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2235 }
2236
2237 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2238 /// Future outputs are produced as available, regardless of input arrival order.
2239 ///
2240 /// # Example
2241 /// ```rust
2242 /// # use std::collections::HashSet;
2243 /// # use futures::StreamExt;
2244 /// # use hydro_lang::*;
2245 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2246 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2247 /// .map(q!(|x| async move {
2248 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2249 /// x
2250 /// }))
2251 /// .resolve_futures()
2252 /// # },
2253 /// # |mut stream| async move {
2254 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2255 /// # let mut output = HashSet::new();
2256 /// # for _ in 1..10 {
2257 /// # output.insert(stream.next().await.unwrap());
2258 /// # }
2259 /// # assert_eq!(
2260 /// # output,
2261 /// # HashSet::<i32>::from_iter(1..10)
2262 /// # );
2263 /// # },
2264 /// # ));
2265 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2266 where
2267 T: Future<Output = T2>,
2268 {
2269 Stream::new(
2270 self.location.clone(),
2271 HydroNode::ResolveFutures {
2272 input: Box::new(self.ir_node.into_inner()),
2273 metadata: self.location.new_node_metadata::<T2>(),
2274 },
2275 )
2276 }
2277
2278 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2279 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2280 /// the order of the input.
2281 ///
2282 /// # Non-Determinism
2283 /// The batch boundaries are non-deterministic and may change across executions.
2284 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2285 self.atomic(tick).batch(nondet)
2286 }
2287
2288 /// Given a time interval, returns a stream corresponding to samples taken from the
2289 /// stream roughly at that interval. The output will have elements in the same order
2290 /// as the input, but with arbitrary elements skipped between samples. There is also
2291 /// no guarantee on the exact timing of the samples.
2292 ///
2293 /// # Non-Determinism
2294 /// The output stream is non-deterministic in which elements are sampled, since this
2295 /// is controlled by a clock.
2296 pub fn sample_every(
2297 self,
2298 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2299 nondet: NonDet,
2300 ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2301 let samples = self.location.source_interval(interval, nondet);
2302
2303 let tick = self.location.tick();
2304 self.batch(&tick, nondet)
2305 .continue_if(samples.batch(&tick, nondet).first())
2306 .all_ticks()
2307 .weakest_retries()
2308 }
2309
2310 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2311 /// stream has not emitted a value since that duration.
2312 ///
2313 /// # Non-Determinism
2314 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2315 /// samples take place, timeouts may be non-deterministically generated or missed,
2316 /// and the notification of the timeout may be delayed as well. There is also no
2317 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2318 /// detected based on when the next sample is taken.
2319 pub fn timeout(
2320 self,
2321 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2322 nondet: NonDet,
2323 ) -> Optional<(), L, Unbounded> {
2324 let tick = self.location.tick();
2325
2326 let latest_received = self.assume_retries(nondet).fold_commutative(
2327 q!(|| None),
2328 q!(|latest, _| {
2329 *latest = Some(Instant::now());
2330 }),
2331 );
2332
2333 latest_received
2334 .snapshot(&tick, nondet)
2335 .filter_map(q!(move |latest_received| {
2336 if let Some(latest_received) = latest_received {
2337 if Instant::now().duration_since(latest_received) > duration {
2338 Some(())
2339 } else {
2340 None
2341 }
2342 } else {
2343 Some(())
2344 }
2345 }))
2346 .latest()
2347 }
2348}
2349
2350impl<'a, F, T, L, B: Boundedness, O, R> Stream<F, L, B, O, R>
2351where
2352 L: Location<'a> + NoTick + NoAtomic,
2353 F: Future<Output = T>,
2354{
2355 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2356 /// Future outputs are produced in the same order as the input stream.
2357 ///
2358 /// # Example
2359 /// ```rust
2360 /// # use std::collections::HashSet;
2361 /// # use futures::StreamExt;
2362 /// # use hydro_lang::*;
2363 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2364 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2365 /// .map(q!(|x| async move {
2366 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2367 /// x
2368 /// }))
2369 /// .resolve_futures_ordered()
2370 /// # },
2371 /// # |mut stream| async move {
2372 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2373 /// # let mut output = Vec::new();
2374 /// # for _ in 1..10 {
2375 /// # output.push(stream.next().await.unwrap());
2376 /// # }
2377 /// # assert_eq!(
2378 /// # output,
2379 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2380 /// # );
2381 /// # },
2382 /// # ));
2383 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2384 Stream::new(
2385 self.location.clone(),
2386 HydroNode::ResolveFuturesOrdered {
2387 input: Box::new(self.ir_node.into_inner()),
2388 metadata: self.location.new_node_metadata::<T>(),
2389 },
2390 )
2391 }
2392}
2393
2394impl<'a, T, L, B: Boundedness, O, R> Stream<T, L, B, O, R>
2395where
2396 L: Location<'a> + NoTick,
2397{
2398 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2399 let f = f.splice_fn1_ctx(&self.location).into();
2400 let metadata = self.location.new_node_metadata::<T>();
2401 self.location
2402 .flow_state()
2403 .borrow_mut()
2404 .leaves
2405 .as_mut()
2406 .expect(FLOW_USED_MESSAGE)
2407 .push(HydroLeaf::ForEach {
2408 input: Box::new(HydroNode::Unpersist {
2409 inner: Box::new(self.ir_node.into_inner()),
2410 metadata: metadata.clone(),
2411 }),
2412 f,
2413 metadata,
2414 });
2415 }
2416
2417 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2418 where
2419 S: 'a + futures::Sink<T> + Unpin,
2420 {
2421 self.location
2422 .flow_state()
2423 .borrow_mut()
2424 .leaves
2425 .as_mut()
2426 .expect(FLOW_USED_MESSAGE)
2427 .push(HydroLeaf::DestSink {
2428 sink: sink.splice_typed_ctx(&self.location).into(),
2429 input: Box::new(self.ir_node.into_inner()),
2430 metadata: self.location.new_node_metadata::<T>(),
2431 });
2432 }
2433}
2434
2435impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
2436where
2437 L: Location<'a>,
2438{
2439 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2440 Stream::new(
2441 self.location.outer().clone(),
2442 HydroNode::Persist {
2443 inner: Box::new(self.ir_node.into_inner()),
2444 metadata: self.location.new_node_metadata::<T>(),
2445 },
2446 )
2447 }
2448
2449 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2450 Stream::new(
2451 Atomic {
2452 tick: self.location.clone(),
2453 },
2454 HydroNode::Persist {
2455 inner: Box::new(self.ir_node.into_inner()),
2456 metadata: self.location.new_node_metadata::<T>(),
2457 },
2458 )
2459 }
2460
2461 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2462 where
2463 T: Clone,
2464 {
2465 Stream::new(
2466 self.location.clone(),
2467 HydroNode::Persist {
2468 inner: Box::new(self.ir_node.into_inner()),
2469 metadata: self.location.new_node_metadata::<T>(),
2470 },
2471 )
2472 }
2473
2474 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2475 Stream::new(
2476 self.location.clone(),
2477 HydroNode::DeferTick {
2478 input: Box::new(self.ir_node.into_inner()),
2479 metadata: self.location.new_node_metadata::<T>(),
2480 },
2481 )
2482 }
2483
2484 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2485 Stream::new(
2486 self.location.clone(),
2487 HydroNode::Delta {
2488 inner: Box::new(self.ir_node.into_inner()),
2489 metadata: self.location.new_node_metadata::<T>(),
2490 },
2491 )
2492 }
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497 use futures::StreamExt;
2498 use hydro_deploy::Deployment;
2499 use serde::{Deserialize, Serialize};
2500 use stageleft::q;
2501
2502 use crate::FlowBuilder;
2503 use crate::location::Location;
2504
2505 struct P1 {}
2506 struct P2 {}
2507
2508 #[derive(Serialize, Deserialize, Debug)]
2509 struct SendOverNetwork {
2510 n: u32,
2511 }
2512
2513 #[tokio::test]
2514 async fn first_ten_distributed() {
2515 let mut deployment = Deployment::new();
2516
2517 let flow = FlowBuilder::new();
2518 let first_node = flow.process::<P1>();
2519 let second_node = flow.process::<P2>();
2520 let external = flow.external::<P2>();
2521
2522 let numbers = first_node.source_iter(q!(0..10));
2523 let out_port = numbers
2524 .map(q!(|n| SendOverNetwork { n }))
2525 .send_bincode(&second_node)
2526 .send_bincode_external(&external);
2527
2528 let nodes = flow
2529 .with_process(&first_node, deployment.Localhost())
2530 .with_process(&second_node, deployment.Localhost())
2531 .with_external(&external, deployment.Localhost())
2532 .deploy(&mut deployment);
2533
2534 deployment.deploy().await.unwrap();
2535
2536 let mut external_out = nodes.connect_source_bincode(out_port).await;
2537
2538 deployment.start().await.unwrap();
2539
2540 for i in 0..10 {
2541 assert_eq!(external_out.next().await.unwrap().n, i);
2542 }
2543 }
2544
2545 #[tokio::test]
2546 async fn first_cardinality() {
2547 let mut deployment = Deployment::new();
2548
2549 let flow = FlowBuilder::new();
2550 let node = flow.process::<()>();
2551 let external = flow.external::<()>();
2552
2553 let node_tick = node.tick();
2554 let count = node_tick
2555 .singleton(q!([1, 2, 3]))
2556 .into_stream()
2557 .flatten_ordered()
2558 .first()
2559 .into_stream()
2560 .count()
2561 .all_ticks()
2562 .send_bincode_external(&external);
2563
2564 let nodes = flow
2565 .with_process(&node, deployment.Localhost())
2566 .with_external(&external, deployment.Localhost())
2567 .deploy(&mut deployment);
2568
2569 deployment.deploy().await.unwrap();
2570
2571 let mut external_out = nodes.connect_source_bincode(count).await;
2572
2573 deployment.start().await.unwrap();
2574
2575 assert_eq!(external_out.next().await.unwrap(), 1);
2576 }
2577}