hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::Deref;
5use std::rc::Rc;
6
7use dfir_rs::bytes::Bytes;
8use dfir_rs::futures;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12use syn::parse_quote;
13use tokio::time::Instant;
14
15use crate::builder::FLOW_USED_MESSAGE;
16use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
17use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode};
18use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
19use crate::location::tick::{Atomic, NoAtomic};
20use crate::location::{
21 CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, check_matching_location,
22};
23use crate::staging_util::get_this_crate;
24use crate::{Bounded, Cluster, ClusterId, Optional, Singleton, Unbounded};
25
26/// Marks the stream as being totally ordered, which means that there are
27/// no sources of non-determinism (other than intentional ones) that will
28/// affect the order of elements.
29pub struct TotalOrder {}
30
31/// Marks the stream as having no order, which means that the order of
32/// elements may be affected by non-determinism.
33///
34/// This restricts certain operators, such as `fold` and `reduce`, to only
35/// be used with commutative aggregation functions.
36pub struct NoOrder {}
37
38/// Helper trait for determining the weakest of two orderings.
39#[sealed::sealed]
40pub trait MinOrder<Other> {
41 /// The weaker of the two orderings.
42 type Min;
43}
44
45#[sealed::sealed]
46impl<T> MinOrder<T> for T {
47 type Min = T;
48}
49
50#[sealed::sealed]
51impl MinOrder<NoOrder> for TotalOrder {
52 type Min = NoOrder;
53}
54
55#[sealed::sealed]
56impl MinOrder<TotalOrder> for NoOrder {
57 type Min = NoOrder;
58}
59
60/// An ordered sequence stream of elements of type `T`.
61///
62/// Type Parameters:
63/// - `T`: the type of elements in the stream
64/// - `L`: the location where the stream is being materialized
65/// - `B`: the boundedness of the stream, which is either [`Bounded`]
66/// or [`Unbounded`]
67/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
68/// or [`NoOrder`] (default is [`TotalOrder`])
69pub struct Stream<T, L, B, Order = TotalOrder> {
70 location: L,
71 pub(crate) ir_node: RefCell<HydroNode>,
72
73 _phantom: PhantomData<(T, L, B, Order)>,
74}
75
76impl<'a, T, L: Location<'a>, O> From<Stream<T, L, Bounded, O>> for Stream<T, L, Unbounded, O> {
77 fn from(stream: Stream<T, L, Bounded, O>) -> Stream<T, L, Unbounded, O> {
78 Stream {
79 location: stream.location,
80 ir_node: stream.ir_node,
81 _phantom: PhantomData,
82 }
83 }
84}
85
86impl<'a, T, L: Location<'a>, B> From<Stream<T, L, B, TotalOrder>> for Stream<T, L, B, NoOrder> {
87 fn from(stream: Stream<T, L, B, TotalOrder>) -> Stream<T, L, B, NoOrder> {
88 Stream {
89 location: stream.location,
90 ir_node: stream.ir_node,
91 _phantom: PhantomData,
92 }
93 }
94}
95
96impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
97 fn location_kind(&self) -> LocationId {
98 self.location.id()
99 }
100}
101
102impl<'a, T, L: Location<'a>, Order> DeferTick for Stream<T, Tick<L>, Bounded, Order> {
103 fn defer_tick(self) -> Self {
104 Stream::defer_tick(self)
105 }
106}
107
108impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker>
109 for Stream<T, Tick<L>, Bounded, Order>
110{
111 type Location = Tick<L>;
112
113 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
114 let location_id = location.id();
115 Stream::new(
116 location.clone(),
117 HydroNode::CycleSource {
118 ident,
119 location_kind: location_id,
120 metadata: location.new_node_metadata::<T>(),
121 },
122 )
123 }
124}
125
126impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker>
127 for Stream<T, Tick<L>, Bounded, Order>
128{
129 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
130 assert_eq!(
131 self.location.id(),
132 expected_location,
133 "locations do not match"
134 );
135 self.location
136 .flow_state()
137 .borrow_mut()
138 .leaves
139 .as_mut()
140 .expect(FLOW_USED_MESSAGE)
141 .push(HydroLeaf::CycleSink {
142 ident,
143 location_kind: self.location_kind(),
144 input: Box::new(self.ir_node.into_inner()),
145 metadata: self.location.new_node_metadata::<T>(),
146 });
147 }
148}
149
150impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker>
151 for Stream<T, L, B, Order>
152{
153 type Location = L;
154
155 fn create_source(ident: syn::Ident, location: L) -> Self {
156 let location_id = location.id();
157
158 Stream::new(
159 location.clone(),
160 HydroNode::Persist {
161 inner: Box::new(HydroNode::CycleSource {
162 ident,
163 location_kind: location_id,
164 metadata: location.new_node_metadata::<T>(),
165 }),
166 metadata: location.new_node_metadata::<T>(),
167 },
168 )
169 }
170}
171
172impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker>
173 for Stream<T, L, B, Order>
174{
175 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
176 assert_eq!(
177 self.location.id(),
178 expected_location,
179 "locations do not match"
180 );
181 let metadata = self.location.new_node_metadata::<T>();
182 self.location
183 .flow_state()
184 .borrow_mut()
185 .leaves
186 .as_mut()
187 .expect(FLOW_USED_MESSAGE)
188 .push(HydroLeaf::CycleSink {
189 ident,
190 location_kind: self.location_kind(),
191 input: Box::new(HydroNode::Unpersist {
192 inner: Box::new(self.ir_node.into_inner()),
193 metadata: metadata.clone(),
194 }),
195 metadata,
196 });
197 }
198}
199
200impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
201 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
202 Stream {
203 location,
204 ir_node: RefCell::new(ir_node),
205 _phantom: PhantomData,
206 }
207 }
208}
209
210impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream<T, L, B, Order> {
211 fn clone(&self) -> Self {
212 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
213 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
214 *self.ir_node.borrow_mut() = HydroNode::Tee {
215 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
216 metadata: self.location.new_node_metadata::<T>(),
217 };
218 }
219
220 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
221 Stream {
222 location: self.location.clone(),
223 ir_node: HydroNode::Tee {
224 inner: TeeNode(inner.0.clone()),
225 metadata: metadata.clone(),
226 }
227 .into(),
228 _phantom: PhantomData,
229 }
230 } else {
231 unreachable!()
232 }
233 }
234}
235
236impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
237 /// Produces a stream based on invoking `f` on each element in order.
238 /// If you do not want to modify the stream and instead only want to view
239 /// each item use [`Stream::inspect`] instead.
240 ///
241 /// # Example
242 /// ```rust
243 /// # use hydro_lang::*;
244 /// # use dfir_rs::futures::StreamExt;
245 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
246 /// let words = process.source_iter(q!(vec!["hello", "world"]));
247 /// words.map(q!(|x| x.to_uppercase()))
248 /// # }, |mut stream| async move {
249 /// # for w in vec!["HELLO", "WORLD"] {
250 /// # assert_eq!(stream.next().await.unwrap(), w);
251 /// # }
252 /// # }));
253 /// ```
254 pub fn map<U, F: Fn(T) -> U + 'a>(
255 self,
256 f: impl IntoQuotedMut<'a, F, L>,
257 ) -> Stream<U, L, B, Order> {
258 let f = f.splice_fn1_ctx(&self.location).into();
259 Stream::new(
260 self.location.clone(),
261 HydroNode::Map {
262 f,
263 input: Box::new(self.ir_node.into_inner()),
264 metadata: self.location.new_node_metadata::<U>(),
265 },
266 )
267 }
268
269 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
270 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
271 /// for the output type `U` must produce items in a **deterministic** order.
272 ///
273 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
274 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
275 ///
276 /// # Example
277 /// ```rust
278 /// # use hydro_lang::*;
279 /// # use dfir_rs::futures::StreamExt;
280 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
281 /// process
282 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
283 /// .flat_map_ordered(q!(|x| x))
284 /// # }, |mut stream| async move {
285 /// // 1, 2, 3, 4
286 /// # for w in (1..5) {
287 /// # assert_eq!(stream.next().await.unwrap(), w);
288 /// # }
289 /// # }));
290 /// ```
291 pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
292 self,
293 f: impl IntoQuotedMut<'a, F, L>,
294 ) -> Stream<U, L, B, Order> {
295 let f = f.splice_fn1_ctx(&self.location).into();
296 Stream::new(
297 self.location.clone(),
298 HydroNode::FlatMap {
299 f,
300 input: Box::new(self.ir_node.into_inner()),
301 metadata: self.location.new_node_metadata::<U>(),
302 },
303 )
304 }
305
306 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
307 /// for the output type `U` to produce items in any order.
308 ///
309 /// # Example
310 /// ```rust
311 /// # use hydro_lang::*;
312 /// # use dfir_rs::futures::StreamExt;
313 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
314 /// process
315 /// .source_iter(q!(vec![
316 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
317 /// std::collections::HashSet::from_iter(vec![3, 4]),
318 /// ]))
319 /// .flat_map_unordered(q!(|x| x))
320 /// # }, |mut stream| async move {
321 /// // 1, 2, 3, 4, but in no particular order
322 /// # let mut results = Vec::new();
323 /// # for w in (1..5) {
324 /// # results.push(stream.next().await.unwrap());
325 /// # }
326 /// # results.sort();
327 /// # assert_eq!(results, vec![1, 2, 3, 4]);
328 /// # }));
329 /// ```
330 pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
331 self,
332 f: impl IntoQuotedMut<'a, F, L>,
333 ) -> Stream<U, L, B, NoOrder> {
334 let f = f.splice_fn1_ctx(&self.location).into();
335 Stream::new(
336 self.location.clone(),
337 HydroNode::FlatMap {
338 f,
339 input: Box::new(self.ir_node.into_inner()),
340 metadata: self.location.new_node_metadata::<U>(),
341 },
342 )
343 }
344
345 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
346 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
347 ///
348 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
349 /// not deterministic, use [`Stream::flatten_unordered`] instead.
350 ///
351 /// ```rust
352 /// # use hydro_lang::*;
353 /// # use dfir_rs::futures::StreamExt;
354 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
355 /// process
356 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
357 /// .flatten_ordered()
358 /// # }, |mut stream| async move {
359 /// // 1, 2, 3, 4
360 /// # for w in (1..5) {
361 /// # assert_eq!(stream.next().await.unwrap(), w);
362 /// # }
363 /// # }));
364 /// ```
365 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, Order>
366 where
367 T: IntoIterator<Item = U>,
368 {
369 self.flat_map_ordered(q!(|d| d))
370 }
371
372 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
373 /// for the element type `T` to produce items in any order.
374 ///
375 /// # Example
376 /// ```rust
377 /// # use hydro_lang::*;
378 /// # use dfir_rs::futures::StreamExt;
379 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder>(|process| {
380 /// process
381 /// .source_iter(q!(vec![
382 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
383 /// std::collections::HashSet::from_iter(vec![3, 4]),
384 /// ]))
385 /// .flatten_unordered()
386 /// # }, |mut stream| async move {
387 /// // 1, 2, 3, 4, but in no particular order
388 /// # let mut results = Vec::new();
389 /// # for w in (1..5) {
390 /// # results.push(stream.next().await.unwrap());
391 /// # }
392 /// # results.sort();
393 /// # assert_eq!(results, vec![1, 2, 3, 4]);
394 /// # }));
395 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
396 where
397 T: IntoIterator<Item = U>,
398 {
399 self.flat_map_unordered(q!(|d| d))
400 }
401
402 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
403 /// `f`, preserving the order of the elements.
404 ///
405 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
406 /// not modify or take ownership of the values. If you need to modify the values while filtering
407 /// use [`Stream::filter_map`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # use hydro_lang::*;
412 /// # use dfir_rs::futures::StreamExt;
413 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
414 /// process
415 /// .source_iter(q!(vec![1, 2, 3, 4]))
416 /// .filter(q!(|&x| x > 2))
417 /// # }, |mut stream| async move {
418 /// // 3, 4
419 /// # for w in (3..5) {
420 /// # assert_eq!(stream.next().await.unwrap(), w);
421 /// # }
422 /// # }));
423 /// ```
424 pub fn filter<F: Fn(&T) -> bool + 'a>(
425 self,
426 f: impl IntoQuotedMut<'a, F, L>,
427 ) -> Stream<T, L, B, Order> {
428 let f = f.splice_fn1_borrow_ctx(&self.location).into();
429 Stream::new(
430 self.location.clone(),
431 HydroNode::Filter {
432 f,
433 input: Box::new(self.ir_node.into_inner()),
434 metadata: self.location.new_node_metadata::<T>(),
435 },
436 )
437 }
438
439 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
440 ///
441 /// # Example
442 /// ```rust
443 /// # use hydro_lang::*;
444 /// # use dfir_rs::futures::StreamExt;
445 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
446 /// process
447 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
448 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
449 /// # }, |mut stream| async move {
450 /// // 1, 2
451 /// # for w in (1..3) {
452 /// # assert_eq!(stream.next().await.unwrap(), w);
453 /// # }
454 /// # }));
455 pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
456 self,
457 f: impl IntoQuotedMut<'a, F, L>,
458 ) -> Stream<U, L, B, Order> {
459 let f = f.splice_fn1_ctx(&self.location).into();
460 Stream::new(
461 self.location.clone(),
462 HydroNode::FilterMap {
463 f,
464 input: Box::new(self.ir_node.into_inner()),
465 metadata: self.location.new_node_metadata::<U>(),
466 },
467 )
468 }
469
470 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
471 /// where `x` is the final value of `other`, a bounded [`Singleton`].
472 ///
473 /// # Example
474 /// ```rust
475 /// # use hydro_lang::*;
476 /// # use dfir_rs::futures::StreamExt;
477 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
478 /// let tick = process.tick();
479 /// let batch = unsafe {
480 /// process
481 /// .source_iter(q!(vec![1, 2, 3, 4]))
482 /// .tick_batch(&tick)
483 /// };
484 /// let count = batch.clone().count(); // `count()` returns a singleton
485 /// batch.cross_singleton(count).all_ticks()
486 /// # }, |mut stream| async move {
487 /// // (1, 4), (2, 4), (3, 4), (4, 4)
488 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
489 /// # assert_eq!(stream.next().await.unwrap(), w);
490 /// # }
491 /// # }));
492 pub fn cross_singleton<O>(
493 self,
494 other: impl Into<Optional<O, L, Bounded>>,
495 ) -> Stream<(T, O), L, B, Order>
496 where
497 O: Clone,
498 {
499 let other: Optional<O, L, Bounded> = other.into();
500 check_matching_location(&self.location, &other.location);
501
502 Stream::new(
503 self.location.clone(),
504 HydroNode::CrossSingleton {
505 left: Box::new(self.ir_node.into_inner()),
506 right: Box::new(other.ir_node.into_inner()),
507 metadata: self.location.new_node_metadata::<(T, O)>(),
508 },
509 )
510 }
511
512 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
513 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
514 self.cross_singleton(signal.map(q!(|_u| ())))
515 .map(q!(|(d, _signal)| d))
516 }
517
518 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
519 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
520 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
521 }
522
523 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
524 /// tupled pairs in a non-deterministic order.
525 ///
526 /// # Example
527 /// ```rust
528 /// # use hydro_lang::*;
529 /// # use std::collections::HashSet;
530 /// # use dfir_rs::futures::StreamExt;
531 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
532 /// let tick = process.tick();
533 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
534 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
535 /// stream1.cross_product(stream2)
536 /// # }, |mut stream| async move {
537 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
538 /// # stream.map(|i| assert!(expected.contains(&i)));
539 /// # }));
540 pub fn cross_product<O>(self, other: Stream<O, L, B, Order>) -> Stream<(T, O), L, B, NoOrder>
541 where
542 T: Clone,
543 O: Clone,
544 {
545 check_matching_location(&self.location, &other.location);
546
547 Stream::new(
548 self.location.clone(),
549 HydroNode::CrossProduct {
550 left: Box::new(self.ir_node.into_inner()),
551 right: Box::new(other.ir_node.into_inner()),
552 metadata: self.location.new_node_metadata::<(T, O)>(),
553 },
554 )
555 }
556
557 /// Takes one stream as input and filters out any duplicate occurrences. The output
558 /// contains all unique values from the input.
559 ///
560 /// # Example
561 /// ```rust
562 /// # use hydro_lang::*;
563 /// # use dfir_rs::futures::StreamExt;
564 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
565 /// let tick = process.tick();
566 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
567 /// # }, |mut stream| async move {
568 /// # for w in vec![1, 2, 3, 4] {
569 /// # assert_eq!(stream.next().await.unwrap(), w);
570 /// # }
571 /// # }));
572 pub fn unique(self) -> Stream<T, L, B, Order>
573 where
574 T: Eq + Hash,
575 {
576 Stream::new(
577 self.location.clone(),
578 HydroNode::Unique {
579 input: Box::new(self.ir_node.into_inner()),
580 metadata: self.location.new_node_metadata::<T>(),
581 },
582 )
583 }
584
585 /// Outputs everything in this stream that is *not* contained in the `other` stream.
586 ///
587 /// The `other` stream must be [`Bounded`], since this function will wait until
588 /// all its elements are available before producing any output.
589 /// # Example
590 /// ```rust
591 /// # use hydro_lang::*;
592 /// # use dfir_rs::futures::StreamExt;
593 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
594 /// let tick = process.tick();
595 /// let stream = unsafe {
596 /// process
597 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
598 /// .tick_batch(&tick)
599 /// };
600 /// let batch = unsafe {
601 /// process
602 /// .source_iter(q!(vec![1, 2]))
603 /// .tick_batch(&tick)
604 /// };
605 /// stream.filter_not_in(batch).all_ticks()
606 /// # }, |mut stream| async move {
607 /// # for w in vec![3, 4] {
608 /// # assert_eq!(stream.next().await.unwrap(), w);
609 /// # }
610 /// # }));
611 pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order>
612 where
613 T: Eq + Hash,
614 {
615 check_matching_location(&self.location, &other.location);
616
617 Stream::new(
618 self.location.clone(),
619 HydroNode::Difference {
620 pos: Box::new(self.ir_node.into_inner()),
621 neg: Box::new(other.ir_node.into_inner()),
622 metadata: self.location.new_node_metadata::<T>(),
623 },
624 )
625 }
626
627 /// An operator which allows you to "inspect" each element of a stream without
628 /// modifying it. The closure `f` is called on a reference to each item. This is
629 /// mainly useful for debugging, and should not be used to generate side-effects.
630 ///
631 /// # Example
632 /// ```rust
633 /// # use hydro_lang::*;
634 /// # use dfir_rs::futures::StreamExt;
635 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
636 /// let nums = process.source_iter(q!(vec![1, 2]));
637 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
638 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
639 /// # }, |mut stream| async move {
640 /// # for w in vec![1, 2] {
641 /// # assert_eq!(stream.next().await.unwrap(), w);
642 /// # }
643 /// # }));
644 /// ```
645 pub fn inspect<F: Fn(&T) + 'a>(
646 self,
647 f: impl IntoQuotedMut<'a, F, L>,
648 ) -> Stream<T, L, B, Order> {
649 let f = f.splice_fn1_borrow_ctx(&self.location).into();
650
651 if L::is_top_level() {
652 Stream::new(
653 self.location.clone(),
654 HydroNode::Persist {
655 inner: Box::new(HydroNode::Inspect {
656 f,
657 input: Box::new(HydroNode::Unpersist {
658 inner: Box::new(self.ir_node.into_inner()),
659 metadata: self.location.new_node_metadata::<T>(),
660 }),
661 metadata: self.location.new_node_metadata::<T>(),
662 }),
663 metadata: self.location.new_node_metadata::<T>(),
664 },
665 )
666 } else {
667 Stream::new(
668 self.location.clone(),
669 HydroNode::Inspect {
670 f,
671 input: Box::new(self.ir_node.into_inner()),
672 metadata: self.location.new_node_metadata::<T>(),
673 },
674 )
675 }
676 }
677
678 /// Explicitly "casts" the stream to a type with a different ordering
679 /// guarantee. Useful in unsafe code where the ordering cannot be proven
680 /// by the type-system.
681 ///
682 /// # Safety
683 /// This function is used as an escape hatch, and any mistakes in the
684 /// provided ordering guarantee will propagate into the guarantees
685 /// for the rest of the program.
686 ///
687 /// # Example
688 /// # TODO: more sensible code after Shadaj merges
689 /// ```rust
690 /// # use hydro_lang::*;
691 /// # use std::collections::HashSet;
692 /// # use dfir_rs::futures::StreamExt;
693 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
694 /// let nums = process.source_iter(q!({
695 /// let now = std::time::SystemTime::now();
696 /// match now.elapsed().unwrap().as_secs() % 2 {
697 /// 0 => vec![5, 4, 3, 2, 1],
698 /// _ => vec![1, 2, 3, 4, 5],
699 /// }
700 /// .into_iter()
701 /// }));
702 /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
703 /// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
704 /// stream
705 /// # }, |mut stream| async move {
706 /// # for w in vec![1, 2, 3, 4, 5] {
707 /// # assert!((1..=5).contains(&stream.next().await.unwrap()));
708 /// # }
709 /// # }));
710 /// ```
711 pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O> {
712 Stream::new(self.location, self.ir_node.into_inner())
713 }
714}
715
716impl<'a, T, L: Location<'a>, B, Order> Stream<&T, L, B, Order> {
717 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
718 ///
719 /// # Example
720 /// ```rust
721 /// # use hydro_lang::*;
722 /// # use dfir_rs::futures::StreamExt;
723 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
724 /// process.source_iter(q!(&[1, 2, 3])).cloned()
725 /// # }, |mut stream| async move {
726 /// // 1, 2, 3
727 /// # for w in vec![1, 2, 3] {
728 /// # assert_eq!(stream.next().await.unwrap(), w);
729 /// # }
730 /// # }));
731 /// ```
732 pub fn cloned(self) -> Stream<T, L, B, Order>
733 where
734 T: Clone,
735 {
736 self.map(q!(|d| d.clone()))
737 }
738}
739
740impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
741where
742 Order: MinOrder<NoOrder, Min = NoOrder>,
743{
744 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
745 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
746 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
747 ///
748 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
749 ///
750 /// # Example
751 /// ```rust
752 /// # use hydro_lang::*;
753 /// # use dfir_rs::futures::StreamExt;
754 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
755 /// let tick = process.tick();
756 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
757 /// let batch = unsafe { numbers.tick_batch(&tick) };
758 /// batch
759 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
760 /// .all_ticks()
761 /// # }, |mut stream| async move {
762 /// // 10
763 /// # assert_eq!(stream.next().await.unwrap(), 10);
764 /// # }));
765 /// ```
766 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
767 self,
768 init: impl IntoQuotedMut<'a, I, L>,
769 comb: impl IntoQuotedMut<'a, F, L>,
770 ) -> Singleton<A, L, B> {
771 let init = init.splice_fn0_ctx(&self.location).into();
772 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
773
774 let mut core = HydroNode::Fold {
775 init,
776 acc: comb,
777 input: Box::new(self.ir_node.into_inner()),
778 metadata: self.location.new_node_metadata::<A>(),
779 };
780
781 if L::is_top_level() {
782 // top-level (possibly unbounded) singletons are represented as
783 // a stream which produces all values from all ticks every tick,
784 // so Unpersist will always give the lastest aggregation
785 core = HydroNode::Persist {
786 inner: Box::new(core),
787 metadata: self.location.new_node_metadata::<A>(),
788 };
789 }
790
791 Singleton::new(self.location, core)
792 }
793
794 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
795 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
796 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
797 /// reference, so that it can be modified in place.
798 ///
799 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
800 ///
801 /// # Example
802 /// ```rust
803 /// # use hydro_lang::*;
804 /// # use dfir_rs::futures::StreamExt;
805 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
806 /// let tick = process.tick();
807 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
808 /// let batch = unsafe { numbers.tick_batch(&tick) };
809 /// batch
810 /// .reduce_commutative(q!(|curr, new| *curr += new))
811 /// .all_ticks()
812 /// # }, |mut stream| async move {
813 /// // 10
814 /// # assert_eq!(stream.next().await.unwrap(), 10);
815 /// # }));
816 /// ```
817 pub fn reduce_commutative<F: Fn(&mut T, T) + 'a>(
818 self,
819 comb: impl IntoQuotedMut<'a, F, L>,
820 ) -> Optional<T, L, B> {
821 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
822 let mut core = HydroNode::Reduce {
823 f,
824 input: Box::new(self.ir_node.into_inner()),
825 metadata: self.location.new_node_metadata::<T>(),
826 };
827
828 if L::is_top_level() {
829 core = HydroNode::Persist {
830 inner: Box::new(core),
831 metadata: self.location.new_node_metadata::<T>(),
832 };
833 }
834
835 Optional::new(self.location, core)
836 }
837
838 /// Computes the maximum element in the stream as an [`Optional`], which
839 /// will be empty until the first element in the input arrives.
840 ///
841 /// # Example
842 /// ```rust
843 /// # use hydro_lang::*;
844 /// # use dfir_rs::futures::StreamExt;
845 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
846 /// let tick = process.tick();
847 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
848 /// let batch = unsafe { numbers.tick_batch(&tick) };
849 /// batch.max().all_ticks()
850 /// # }, |mut stream| async move {
851 /// // 4
852 /// # assert_eq!(stream.next().await.unwrap(), 4);
853 /// # }));
854 /// ```
855 pub fn max(self) -> Optional<T, L, B>
856 where
857 T: Ord,
858 {
859 self.reduce_commutative(q!(|curr, new| {
860 if new > *curr {
861 *curr = new;
862 }
863 }))
864 }
865
866 /// Computes the maximum element in the stream as an [`Optional`], where the
867 /// maximum is determined according to the `key` function. The [`Optional`] will
868 /// be empty until the first element in the input arrives.
869 ///
870 /// # Example
871 /// ```rust
872 /// # use hydro_lang::*;
873 /// # use dfir_rs::futures::StreamExt;
874 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
875 /// let tick = process.tick();
876 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
877 /// let batch = unsafe { numbers.tick_batch(&tick) };
878 /// batch.max_by_key(q!(|x| -x)).all_ticks()
879 /// # }, |mut stream| async move {
880 /// // 1
881 /// # assert_eq!(stream.next().await.unwrap(), 1);
882 /// # }));
883 /// ```
884 pub fn max_by_key<K: Ord, F: Fn(&T) -> K + 'a>(
885 self,
886 key: impl IntoQuotedMut<'a, F, L> + Copy,
887 ) -> Optional<T, L, B> {
888 let f = key.splice_fn1_borrow_ctx(&self.location);
889
890 let wrapped: syn::Expr = parse_quote!({
891 let key_fn = #f;
892 move |curr, new| {
893 if key_fn(&new) > key_fn(&*curr) {
894 *curr = new;
895 }
896 }
897 });
898
899 let mut core = HydroNode::Reduce {
900 f: wrapped.into(),
901 input: Box::new(self.ir_node.into_inner()),
902 metadata: self.location.new_node_metadata::<T>(),
903 };
904
905 if L::is_top_level() {
906 core = HydroNode::Persist {
907 inner: Box::new(core),
908 metadata: self.location.new_node_metadata::<T>(),
909 };
910 }
911
912 Optional::new(self.location, core)
913 }
914
915 /// Computes the minimum element in the stream as an [`Optional`], which
916 /// will be empty until the first element in the input arrives.
917 ///
918 /// # Example
919 /// ```rust
920 /// # use hydro_lang::*;
921 /// # use dfir_rs::futures::StreamExt;
922 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
923 /// let tick = process.tick();
924 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
925 /// let batch = unsafe { numbers.tick_batch(&tick) };
926 /// batch.min().all_ticks()
927 /// # }, |mut stream| async move {
928 /// // 1
929 /// # assert_eq!(stream.next().await.unwrap(), 1);
930 /// # }));
931 /// ```
932 pub fn min(self) -> Optional<T, L, B>
933 where
934 T: Ord,
935 {
936 self.reduce_commutative(q!(|curr, new| {
937 if new < *curr {
938 *curr = new;
939 }
940 }))
941 }
942
943 /// Computes the number of elements in the stream as a [`Singleton`].
944 ///
945 /// # Example
946 /// ```rust
947 /// # use hydro_lang::*;
948 /// # use dfir_rs::futures::StreamExt;
949 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
950 /// let tick = process.tick();
951 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
952 /// let batch = unsafe { numbers.tick_batch(&tick) };
953 /// batch.count().all_ticks()
954 /// # }, |mut stream| async move {
955 /// // 4
956 /// # assert_eq!(stream.next().await.unwrap(), 4);
957 /// # }));
958 /// ```
959 pub fn count(self) -> Singleton<usize, L, B> {
960 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
961 }
962}
963
964impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
965 /// Returns a stream with the current count tupled with each element in the input stream.
966 ///
967 /// # Example
968 /// ```rust
969 /// # use hydro_lang::*;
970 /// # use dfir_rs::futures::StreamExt;
971 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
972 /// let tick = process.tick();
973 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
974 /// numbers.enumerate()
975 /// # }, |mut stream| async move {
976 /// // (0, 1), (1, 2), (2, 3), (3, 4)
977 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
978 /// # assert_eq!(stream.next().await.unwrap(), w);
979 /// # }
980 /// # }));
981 /// ```
982 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
983 if L::is_top_level() {
984 Stream::new(
985 self.location.clone(),
986 HydroNode::Persist {
987 inner: Box::new(HydroNode::Enumerate {
988 is_static: true,
989 input: Box::new(HydroNode::Unpersist {
990 inner: Box::new(self.ir_node.into_inner()),
991 metadata: self.location.new_node_metadata::<T>(),
992 }),
993 metadata: self.location.new_node_metadata::<(usize, T)>(),
994 }),
995 metadata: self.location.new_node_metadata::<(usize, T)>(),
996 },
997 )
998 } else {
999 Stream::new(
1000 self.location.clone(),
1001 HydroNode::Enumerate {
1002 is_static: false,
1003 input: Box::new(self.ir_node.into_inner()),
1004 metadata: self.location.new_node_metadata::<(usize, T)>(),
1005 },
1006 )
1007 }
1008 }
1009
1010 /// Computes the first element in the stream as an [`Optional`], which
1011 /// will be empty until the first element in the input arrives.
1012 ///
1013 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1014 /// re-ordering of elements may cause the first element to change.
1015 ///
1016 /// # Example
1017 /// ```rust
1018 /// # use hydro_lang::*;
1019 /// # use dfir_rs::futures::StreamExt;
1020 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1021 /// let tick = process.tick();
1022 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1023 /// let batch = unsafe { numbers.tick_batch(&tick) };
1024 /// batch.first().all_ticks()
1025 /// # }, |mut stream| async move {
1026 /// // 1
1027 /// # assert_eq!(stream.next().await.unwrap(), 1);
1028 /// # }));
1029 /// ```
1030 pub fn first(self) -> Optional<T, L, B> {
1031 Optional::new(self.location, self.ir_node.into_inner())
1032 }
1033
1034 /// Computes the last element in the stream as an [`Optional`], which
1035 /// will be empty until an element in the input arrives.
1036 ///
1037 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1038 /// re-ordering of elements may cause the last element to change.
1039 ///
1040 /// # Example
1041 /// ```rust
1042 /// # use hydro_lang::*;
1043 /// # use dfir_rs::futures::StreamExt;
1044 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1045 /// let tick = process.tick();
1046 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1047 /// let batch = unsafe { numbers.tick_batch(&tick) };
1048 /// batch.last().all_ticks()
1049 /// # }, |mut stream| async move {
1050 /// // 4
1051 /// # assert_eq!(stream.next().await.unwrap(), 4);
1052 /// # }));
1053 /// ```
1054 pub fn last(self) -> Optional<T, L, B> {
1055 self.reduce(q!(|curr, new| *curr = new))
1056 }
1057
1058 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1059 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1060 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1061 ///
1062 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1063 /// to depend on the order of elements in the stream.
1064 ///
1065 /// # Example
1066 /// ```rust
1067 /// # use hydro_lang::*;
1068 /// # use dfir_rs::futures::StreamExt;
1069 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1070 /// let tick = process.tick();
1071 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1072 /// let batch = unsafe { words.tick_batch(&tick) };
1073 /// batch
1074 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1075 /// .all_ticks()
1076 /// # }, |mut stream| async move {
1077 /// // "HELLOWORLD"
1078 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1079 /// # }));
1080 /// ```
1081 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1082 self,
1083 init: impl IntoQuotedMut<'a, I, L>,
1084 comb: impl IntoQuotedMut<'a, F, L>,
1085 ) -> Singleton<A, L, B> {
1086 let init = init.splice_fn0_ctx(&self.location).into();
1087 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1088
1089 let mut core = HydroNode::Fold {
1090 init,
1091 acc: comb,
1092 input: Box::new(self.ir_node.into_inner()),
1093 metadata: self.location.new_node_metadata::<A>(),
1094 };
1095
1096 if L::is_top_level() {
1097 // top-level (possibly unbounded) singletons are represented as
1098 // a stream which produces all values from all ticks every tick,
1099 // so Unpersist will always give the lastest aggregation
1100 core = HydroNode::Persist {
1101 inner: Box::new(core),
1102 metadata: self.location.new_node_metadata::<A>(),
1103 };
1104 }
1105
1106 Singleton::new(self.location, core)
1107 }
1108
1109 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1110 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1111 /// until the first element in the input arrives.
1112 ///
1113 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1114 /// to depend on the order of elements in the stream.
1115 ///
1116 /// # Example
1117 /// ```rust
1118 /// # use hydro_lang::*;
1119 /// # use dfir_rs::futures::StreamExt;
1120 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1121 /// let tick = process.tick();
1122 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1123 /// let batch = unsafe { words.tick_batch(&tick) };
1124 /// batch
1125 /// .map(q!(|x| x.to_string()))
1126 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1127 /// .all_ticks()
1128 /// # }, |mut stream| async move {
1129 /// // "HELLOWORLD"
1130 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1131 /// # }));
1132 /// ```
1133 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1134 self,
1135 comb: impl IntoQuotedMut<'a, F, L>,
1136 ) -> Optional<T, L, B> {
1137 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1138 let mut core = HydroNode::Reduce {
1139 f,
1140 input: Box::new(self.ir_node.into_inner()),
1141 metadata: self.location.new_node_metadata::<T>(),
1142 };
1143
1144 if L::is_top_level() {
1145 core = HydroNode::Persist {
1146 inner: Box::new(core),
1147 metadata: self.location.new_node_metadata::<T>(),
1148 };
1149 }
1150
1151 Optional::new(self.location, core)
1152 }
1153}
1154
1155impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream<T, L, Unbounded, O> {
1156 /// Produces a new stream that interleaves the elements of the two input streams.
1157 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1158 ///
1159 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1160 /// [`Bounded`], you can use [`Stream::chain`] instead.
1161 ///
1162 /// # Example
1163 /// ```rust
1164 /// # use hydro_lang::*;
1165 /// # use dfir_rs::futures::StreamExt;
1166 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1167 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1168 /// numbers.clone().map(q!(|x| x + 1)).union(numbers)
1169 /// # }, |mut stream| async move {
1170 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1171 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1172 /// # assert_eq!(stream.next().await.unwrap(), w);
1173 /// # }
1174 /// # }));
1175 /// ```
1176 pub fn union<O2>(self, other: Stream<T, L, Unbounded, O2>) -> Stream<T, L, Unbounded, NoOrder> {
1177 let tick = self.location.tick();
1178 unsafe {
1179 // SAFETY: Because the outputs are unordered,
1180 // we can interleave batches from both streams.
1181 self.tick_batch(&tick)
1182 .assume_ordering::<NoOrder>()
1183 .chain(other.tick_batch(&tick).assume_ordering::<NoOrder>())
1184 .all_ticks()
1185 .assume_ordering()
1186 }
1187 }
1188}
1189
1190impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
1191 /// Produces a new stream that emits the input elements in sorted order.
1192 ///
1193 /// The input stream can have any ordering guarantee, but the output stream
1194 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1195 /// elements in the input stream are available, so it requires the input stream
1196 /// to be [`Bounded`].
1197 ///
1198 /// # Example
1199 /// ```rust
1200 /// # use hydro_lang::*;
1201 /// # use dfir_rs::futures::StreamExt;
1202 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1203 /// let tick = process.tick();
1204 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1205 /// let batch = unsafe { numbers.tick_batch(&tick) };
1206 /// batch.sort().all_ticks()
1207 /// # }, |mut stream| async move {
1208 /// // 1, 2, 3, 4
1209 /// # for w in (1..5) {
1210 /// # assert_eq!(stream.next().await.unwrap(), w);
1211 /// # }
1212 /// # }));
1213 /// ```
1214 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder>
1215 where
1216 T: Ord,
1217 {
1218 Stream::new(
1219 self.location.clone(),
1220 HydroNode::Sort {
1221 input: Box::new(self.ir_node.into_inner()),
1222 metadata: self.location.new_node_metadata::<T>(),
1223 },
1224 )
1225 }
1226
1227 /// Produces a new stream that first emits the elements of the `self` stream,
1228 /// and then emits the elements of the `other` stream. The output stream has
1229 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1230 /// [`TotalOrder`] guarantee.
1231 ///
1232 /// Currently, both input streams must be [`Bounded`]. This operator will block
1233 /// on the first stream until all its elements are available. In a future version,
1234 /// we will relax the requirement on the `other` stream.
1235 ///
1236 /// # Example
1237 /// ```rust
1238 /// # use hydro_lang::*;
1239 /// # use dfir_rs::futures::StreamExt;
1240 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1241 /// let tick = process.tick();
1242 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1243 /// let batch = unsafe { numbers.tick_batch(&tick) };
1244 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1245 /// # }, |mut stream| async move {
1246 /// // 2, 3, 4, 5, 1, 2, 3, 4
1247 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1248 /// # assert_eq!(stream.next().await.unwrap(), w);
1249 /// # }
1250 /// # }));
1251 /// ```
1252 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order::Min>
1253 where
1254 Order: MinOrder<O2>,
1255 {
1256 check_matching_location(&self.location, &other.location);
1257
1258 Stream::new(
1259 self.location.clone(),
1260 HydroNode::Chain {
1261 first: Box::new(self.ir_node.into_inner()),
1262 second: Box::new(other.ir_node.into_inner()),
1263 metadata: self.location.new_node_metadata::<T>(),
1264 },
1265 )
1266 }
1267}
1268
1269impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> {
1270 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1271 /// by equi-joining the two streams on the key attribute `K`.
1272 ///
1273 /// # Example
1274 /// ```rust
1275 /// # use hydro_lang::*;
1276 /// # use std::collections::HashSet;
1277 /// # use dfir_rs::futures::StreamExt;
1278 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1279 /// let tick = process.tick();
1280 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1281 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1282 /// stream1.join(stream2)
1283 /// # }, |mut stream| async move {
1284 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1285 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1286 /// # stream.map(|i| assert!(expected.contains(&i)));
1287 /// # }));
1288 pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
1289 where
1290 K: Eq + Hash,
1291 {
1292 check_matching_location(&self.location, &n.location);
1293
1294 Stream::new(
1295 self.location.clone(),
1296 HydroNode::Join {
1297 left: Box::new(self.ir_node.into_inner()),
1298 right: Box::new(n.ir_node.into_inner()),
1299 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1300 },
1301 )
1302 }
1303
1304 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1305 /// computes the anti-join of the items in the input -- i.e. returns
1306 /// unique items in the first input that do not have a matching key
1307 /// in the second input.
1308 ///
1309 /// # Example
1310 /// ```rust
1311 /// # use hydro_lang::*;
1312 /// # use dfir_rs::futures::StreamExt;
1313 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1314 /// let tick = process.tick();
1315 /// let stream = unsafe {
1316 /// process
1317 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1318 /// .tick_batch(&tick)
1319 /// };
1320 /// let batch = unsafe {
1321 /// process
1322 /// .source_iter(q!(vec![1, 2]))
1323 /// .tick_batch(&tick)
1324 /// };
1325 /// stream.anti_join(batch).all_ticks()
1326 /// # }, |mut stream| async move {
1327 /// # for w in vec![(3, 'c'), (4, 'd')] {
1328 /// # assert_eq!(stream.next().await.unwrap(), w);
1329 /// # }
1330 /// # }));
1331 pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, Order>
1332 where
1333 K: Eq + Hash,
1334 {
1335 check_matching_location(&self.location, &n.location);
1336
1337 Stream::new(
1338 self.location.clone(),
1339 HydroNode::AntiJoin {
1340 pos: Box::new(self.ir_node.into_inner()),
1341 neg: Box::new(n.ir_node.into_inner()),
1342 metadata: self.location.new_node_metadata::<(K, V1)>(),
1343 },
1344 )
1345 }
1346}
1347
1348impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick<L>, Bounded> {
1349 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1350 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1351 /// in the second element are accumulated via the `comb` closure.
1352 ///
1353 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1354 /// to depend on the order of elements in the stream.
1355 ///
1356 /// If the input and output value types are the same and do not require initialization then use
1357 /// [`Stream::reduce_keyed`].
1358 ///
1359 /// # Example
1360 /// ```rust
1361 /// # use hydro_lang::*;
1362 /// # use dfir_rs::futures::StreamExt;
1363 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1364 /// let tick = process.tick();
1365 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1366 /// let batch = unsafe { numbers.tick_batch(&tick) };
1367 /// batch
1368 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1369 /// .all_ticks()
1370 /// # }, |mut stream| async move {
1371 /// // (1, 5), (2, 7)
1372 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1373 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1374 /// # }));
1375 /// ```
1376 pub fn fold_keyed<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1377 self,
1378 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1379 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1380 ) -> Stream<(K, A), Tick<L>, Bounded> {
1381 let init = init.splice_fn0_ctx(&self.location).into();
1382 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1383
1384 Stream::new(
1385 self.location.clone(),
1386 HydroNode::FoldKeyed {
1387 init,
1388 acc: comb,
1389 input: Box::new(self.ir_node.into_inner()),
1390 metadata: self.location.new_node_metadata::<(K, A)>(),
1391 },
1392 )
1393 }
1394
1395 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1396 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1397 /// in the second element are accumulated via the `comb` closure.
1398 ///
1399 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1400 /// to depend on the order of elements in the stream.
1401 ///
1402 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1403 ///
1404 /// # Example
1405 /// ```rust
1406 /// # use hydro_lang::*;
1407 /// # use dfir_rs::futures::StreamExt;
1408 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1409 /// let tick = process.tick();
1410 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1411 /// let batch = unsafe { numbers.tick_batch(&tick) };
1412 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1413 /// # }, |mut stream| async move {
1414 /// // (1, 5), (2, 7)
1415 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1416 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1417 /// # }));
1418 /// ```
1419 pub fn reduce_keyed<F: Fn(&mut V, V) + 'a>(
1420 self,
1421 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1422 ) -> Stream<(K, V), Tick<L>, Bounded> {
1423 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1424
1425 Stream::new(
1426 self.location.clone(),
1427 HydroNode::ReduceKeyed {
1428 f,
1429 input: Box::new(self.ir_node.into_inner()),
1430 metadata: self.location.new_node_metadata::<(K, V)>(),
1431 },
1432 )
1433 }
1434}
1435
1436impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounded, Order> {
1437 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1438 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1439 /// in the second element are accumulated via the `comb` closure.
1440 ///
1441 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1442 ///
1443 /// If the input and output value types are the same and do not require initialization then use
1444 /// [`Stream::reduce_keyed_commutative`].
1445 ///
1446 /// # Example
1447 /// ```rust
1448 /// # use hydro_lang::*;
1449 /// # use dfir_rs::futures::StreamExt;
1450 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1451 /// let tick = process.tick();
1452 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1453 /// let batch = unsafe { numbers.tick_batch(&tick) };
1454 /// batch
1455 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1456 /// .all_ticks()
1457 /// # }, |mut stream| async move {
1458 /// // (1, 5), (2, 7)
1459 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1460 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1461 /// # }));
1462 /// ```
1463 pub fn fold_keyed_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V) + 'a>(
1464 self,
1465 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1466 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1467 ) -> Stream<(K, A), Tick<L>, Bounded, Order> {
1468 let init = init.splice_fn0_ctx(&self.location).into();
1469 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1470
1471 Stream::new(
1472 self.location.clone(),
1473 HydroNode::FoldKeyed {
1474 init,
1475 acc: comb,
1476 input: Box::new(self.ir_node.into_inner()),
1477 metadata: self.location.new_node_metadata::<(K, A)>(),
1478 },
1479 )
1480 }
1481
1482 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1483 /// # Example
1484 /// ```rust
1485 /// # use hydro_lang::*;
1486 /// # use dfir_rs::futures::StreamExt;
1487 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1488 /// let tick = process.tick();
1489 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1490 /// let batch = unsafe { numbers.tick_batch(&tick) };
1491 /// batch.keys().all_ticks()
1492 /// # }, |mut stream| async move {
1493 /// // 1, 2
1494 /// # assert_eq!(stream.next().await.unwrap(), 1);
1495 /// # assert_eq!(stream.next().await.unwrap(), 2);
1496 /// # }));
1497 /// ```
1498 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order> {
1499 self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
1500 .map(q!(|(k, _)| k))
1501 }
1502
1503 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1504 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1505 /// in the second element are accumulated via the `comb` closure.
1506 ///
1507 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1508 ///
1509 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
1510 ///
1511 /// # Example
1512 /// ```rust
1513 /// # use hydro_lang::*;
1514 /// # use dfir_rs::futures::StreamExt;
1515 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1516 /// let tick = process.tick();
1517 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1518 /// let batch = unsafe { numbers.tick_batch(&tick) };
1519 /// batch
1520 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
1521 /// .all_ticks()
1522 /// # }, |mut stream| async move {
1523 /// // (1, 5), (2, 7)
1524 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1525 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1526 /// # }));
1527 /// ```
1528 pub fn reduce_keyed_commutative<F: Fn(&mut V, V) + 'a>(
1529 self,
1530 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1531 ) -> Stream<(K, V), Tick<L>, Bounded, Order> {
1532 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1533
1534 Stream::new(
1535 self.location.clone(),
1536 HydroNode::ReduceKeyed {
1537 f,
1538 input: Box::new(self.ir_node.into_inner()),
1539 metadata: self.location.new_node_metadata::<(K, V)>(),
1540 },
1541 )
1542 }
1543}
1544
1545impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, Atomic<L>, B, Order> {
1546 /// Returns a stream corresponding to the latest batch of elements being atomically
1547 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1548 /// the order of the input.
1549 ///
1550 /// # Safety
1551 /// The batch boundaries are non-deterministic and may change across executions.
1552 pub unsafe fn tick_batch(self) -> Stream<T, Tick<L>, Bounded, Order> {
1553 Stream::new(
1554 self.location.clone().tick,
1555 HydroNode::Unpersist {
1556 inner: Box::new(self.ir_node.into_inner()),
1557 metadata: self.location.new_node_metadata::<T>(),
1558 },
1559 )
1560 }
1561
1562 pub fn end_atomic(self) -> Stream<T, L, B, Order> {
1563 Stream::new(self.location.tick.l, self.ir_node.into_inner())
1564 }
1565
1566 pub fn atomic_source(&self) -> Tick<L> {
1567 self.location.tick.clone()
1568 }
1569}
1570
1571impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B, Order> Stream<T, L, B, Order> {
1572 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, Order> {
1573 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
1574 }
1575
1576 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1577 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1578 /// the order of the input.
1579 ///
1580 /// # Safety
1581 /// The batch boundaries are non-deterministic and may change across executions.
1582 pub unsafe fn tick_batch(self, tick: &Tick<L>) -> Stream<T, Tick<L>, Bounded, Order> {
1583 unsafe { self.atomic(tick).tick_batch() }
1584 }
1585
1586 /// Given a time interval, returns a stream corresponding to samples taken from the
1587 /// stream roughly at that interval. The output will have elements in the same order
1588 /// as the input, but with arbitrary elements skipped between samples. There is also
1589 /// no guarantee on the exact timing of the samples.
1590 ///
1591 /// # Safety
1592 /// The output stream is non-deterministic in which elements are sampled, since this
1593 /// is controlled by a clock.
1594 pub unsafe fn sample_every(
1595 self,
1596 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1597 ) -> Stream<T, L, Unbounded, Order> {
1598 let samples = unsafe {
1599 // SAFETY: source of intentional non-determinism
1600 self.location.source_interval(interval)
1601 };
1602
1603 let tick = self.location.tick();
1604 unsafe {
1605 // SAFETY: source of intentional non-determinism
1606 self.tick_batch(&tick)
1607 .continue_if(samples.tick_batch(&tick).first())
1608 .all_ticks()
1609 }
1610 }
1611
1612 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1613 /// stream has not emitted a value since that duration.
1614 ///
1615 /// # Safety
1616 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1617 /// samples take place, timeouts may be non-deterministically generated or missed,
1618 /// and the notification of the timeout may be delayed as well. There is also no
1619 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1620 /// detected based on when the next sample is taken.
1621 pub unsafe fn timeout(
1622 self,
1623 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1624 ) -> Optional<(), L, Unbounded>
1625 where
1626 Order: MinOrder<NoOrder, Min = NoOrder>,
1627 {
1628 let tick = self.location.tick();
1629
1630 let latest_received = self.fold_commutative(
1631 q!(|| None),
1632 q!(|latest, _| {
1633 *latest = Some(Instant::now());
1634 }),
1635 );
1636
1637 unsafe {
1638 // SAFETY: Non-deterministic delay in detecting a timeout is expected.
1639 latest_received.latest_tick(&tick)
1640 }
1641 .filter_map(q!(move |latest_received| {
1642 if let Some(latest_received) = latest_received {
1643 if Instant::now().duration_since(latest_received) > duration {
1644 Some(())
1645 } else {
1646 None
1647 }
1648 } else {
1649 Some(())
1650 }
1651 }))
1652 .latest()
1653 }
1654}
1655
1656impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1657 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
1658 let f = f.splice_fn1_ctx(&self.location).into();
1659 let metadata = self.location.new_node_metadata::<T>();
1660 self.location
1661 .flow_state()
1662 .borrow_mut()
1663 .leaves
1664 .as_mut()
1665 .expect(FLOW_USED_MESSAGE)
1666 .push(HydroLeaf::ForEach {
1667 input: Box::new(HydroNode::Unpersist {
1668 inner: Box::new(self.ir_node.into_inner()),
1669 metadata: metadata.clone(),
1670 }),
1671 f,
1672 metadata,
1673 });
1674 }
1675
1676 pub fn dest_sink<S: Unpin + futures::Sink<T> + 'a>(
1677 self,
1678 sink: impl QuotedWithContext<'a, S, L>,
1679 ) {
1680 self.location
1681 .flow_state()
1682 .borrow_mut()
1683 .leaves
1684 .as_mut()
1685 .expect(FLOW_USED_MESSAGE)
1686 .push(HydroLeaf::DestSink {
1687 sink: sink.splice_typed_ctx(&self.location).into(),
1688 input: Box::new(self.ir_node.into_inner()),
1689 metadata: self.location.new_node_metadata::<T>(),
1690 });
1691 }
1692}
1693
1694impl<'a, T, L: Location<'a>, Order> Stream<T, Tick<L>, Bounded, Order> {
1695 pub fn all_ticks(self) -> Stream<T, L, Unbounded, Order> {
1696 Stream::new(
1697 self.location.outer().clone(),
1698 HydroNode::Persist {
1699 inner: Box::new(self.ir_node.into_inner()),
1700 metadata: self.location.new_node_metadata::<T>(),
1701 },
1702 )
1703 }
1704
1705 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, Order> {
1706 Stream::new(
1707 Atomic {
1708 tick: self.location.clone(),
1709 },
1710 HydroNode::Persist {
1711 inner: Box::new(self.ir_node.into_inner()),
1712 metadata: self.location.new_node_metadata::<T>(),
1713 },
1714 )
1715 }
1716
1717 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, Order>
1718 where
1719 T: Clone,
1720 {
1721 Stream::new(
1722 self.location.clone(),
1723 HydroNode::Persist {
1724 inner: Box::new(self.ir_node.into_inner()),
1725 metadata: self.location.new_node_metadata::<T>(),
1726 },
1727 )
1728 }
1729
1730 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, Order> {
1731 Stream::new(
1732 self.location.clone(),
1733 HydroNode::DeferTick {
1734 input: Box::new(self.ir_node.into_inner()),
1735 metadata: self.location.new_node_metadata::<T>(),
1736 },
1737 )
1738 }
1739
1740 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, Order> {
1741 Stream::new(
1742 self.location.clone(),
1743 HydroNode::Delta {
1744 inner: Box::new(self.ir_node.into_inner()),
1745 metadata: self.location.new_node_metadata::<T>(),
1746 },
1747 )
1748 }
1749}
1750
1751pub fn serialize_bincode_with_type(is_demux: bool, t_type: syn::Type) -> syn::Expr {
1752 let root = get_this_crate();
1753
1754 if is_demux {
1755 parse_quote! {
1756 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::ClusterId<_>, #t_type), _>(
1757 |(id, data)| {
1758 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
1759 }
1760 )
1761 }
1762 } else {
1763 parse_quote! {
1764 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
1765 |data| {
1766 #root::runtime_support::bincode::serialize(&data).unwrap().into()
1767 }
1768 )
1769 }
1770 }
1771}
1772
1773fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
1774 serialize_bincode_with_type(is_demux, stageleft::quote_type::<T>())
1775}
1776
1777pub fn deserialize_bincode_with_type(tagged: Option<syn::Type>, t_type: syn::Type) -> syn::Expr {
1778 let root = get_this_crate();
1779
1780 if let Some(c_type) = tagged {
1781 parse_quote! {
1782 |res| {
1783 let (id, b) = res.unwrap();
1784 (#root::ClusterId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
1785 }
1786 }
1787 } else {
1788 parse_quote! {
1789 |res| {
1790 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
1791 }
1792 }
1793 }
1794}
1795
1796pub(super) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<syn::Type>) -> syn::Expr {
1797 deserialize_bincode_with_type(tagged, stageleft::quote_type::<T>())
1798}
1799
1800impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
1801 pub fn send_bincode<L2: Location<'a>, CoreType>(
1802 self,
1803 other: &L2,
1804 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<CoreType>, L2, Unbounded, Order::Min>
1805 where
1806 L::Root: CanSend<'a, L2, In<CoreType> = T>,
1807 CoreType: Serialize + DeserializeOwned,
1808 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1809 {
1810 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::Root::is_demux()));
1811
1812 let deserialize_pipeline = Some(deserialize_bincode::<CoreType>(L::Root::tagged_type()));
1813
1814 Stream::new(
1815 other.clone(),
1816 HydroNode::Network {
1817 from_key: None,
1818 to_location: other.id(),
1819 to_key: None,
1820 serialize_fn: serialize_pipeline.map(|e| e.into()),
1821 instantiate_fn: DebugInstantiate::Building,
1822 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1823 input: Box::new(self.ir_node.into_inner()),
1824 metadata: other.new_node_metadata::<CoreType>(),
1825 },
1826 )
1827 }
1828
1829 pub fn send_bincode_external<L2: 'a, CoreType>(
1830 self,
1831 other: &ExternalProcess<L2>,
1832 ) -> ExternalBincodeStream<L::Out<CoreType>>
1833 where
1834 L: CanSend<'a, ExternalProcess<'a, L2>, In<CoreType> = T, Out<CoreType> = CoreType>,
1835 CoreType: Serialize + DeserializeOwned,
1836 // for now, we restirct Out<CoreType> to be CoreType, which means no tagged cluster -> external
1837 {
1838 let serialize_pipeline = Some(serialize_bincode::<CoreType>(L::is_demux()));
1839
1840 let metadata = other.new_node_metadata::<CoreType>();
1841
1842 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1843
1844 let external_key = flow_state_borrow.next_external_out;
1845 flow_state_borrow.next_external_out += 1;
1846
1847 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1848
1849 let dummy_f: syn::Expr = syn::parse_quote!(());
1850
1851 leaves.push(HydroLeaf::ForEach {
1852 f: dummy_f.into(),
1853 input: Box::new(HydroNode::Network {
1854 from_key: None,
1855 to_location: other.id(),
1856 to_key: Some(external_key),
1857 serialize_fn: serialize_pipeline.map(|e| e.into()),
1858 instantiate_fn: DebugInstantiate::Building,
1859 deserialize_fn: None,
1860 input: Box::new(self.ir_node.into_inner()),
1861 metadata: metadata.clone(),
1862 }),
1863 metadata,
1864 });
1865
1866 ExternalBincodeStream {
1867 process_id: other.id,
1868 port_id: external_key,
1869 _phantom: PhantomData,
1870 }
1871 }
1872
1873 pub fn send_bytes<L2: Location<'a>>(
1874 self,
1875 other: &L2,
1876 ) -> Stream<<L::Root as CanSend<'a, L2>>::Out<Bytes>, L2, Unbounded, Order::Min>
1877 where
1878 L::Root: CanSend<'a, L2, In<Bytes> = T>,
1879 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1880 {
1881 let root = get_this_crate();
1882 Stream::new(
1883 other.clone(),
1884 HydroNode::Network {
1885 from_key: None,
1886 to_location: other.id(),
1887 to_key: None,
1888 serialize_fn: None,
1889 instantiate_fn: DebugInstantiate::Building,
1890 deserialize_fn: if let Some(c_type) = L::Root::tagged_type() {
1891 let expr: syn::Expr = parse_quote!(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()));
1892 Some(expr.into())
1893 } else {
1894 let expr: syn::Expr = parse_quote!(|b| b.unwrap().freeze());
1895 Some(expr.into())
1896 },
1897 input: Box::new(self.ir_node.into_inner()),
1898 metadata: other.new_node_metadata::<Bytes>(),
1899 },
1900 )
1901 }
1902
1903 pub fn send_bytes_external<L2: 'a>(self, other: &ExternalProcess<L2>) -> ExternalBytesPort
1904 where
1905 L::Root: CanSend<'a, ExternalProcess<'a, L2>, In<Bytes> = T, Out<Bytes> = Bytes>,
1906 {
1907 let metadata = other.new_node_metadata::<Bytes>();
1908
1909 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1910 let external_key = flow_state_borrow.next_external_out;
1911 flow_state_borrow.next_external_out += 1;
1912
1913 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
1914
1915 let dummy_f: syn::Expr = syn::parse_quote!(());
1916
1917 leaves.push(HydroLeaf::ForEach {
1918 f: dummy_f.into(),
1919 input: Box::new(HydroNode::Network {
1920 from_key: None,
1921 to_location: other.id(),
1922 to_key: Some(external_key),
1923 serialize_fn: None,
1924 instantiate_fn: DebugInstantiate::Building,
1925 deserialize_fn: None,
1926 input: Box::new(self.ir_node.into_inner()),
1927 metadata: metadata.clone(),
1928 }),
1929 metadata,
1930 });
1931
1932 ExternalBytesPort {
1933 process_id: other.id,
1934 port_id: external_key,
1935 }
1936 }
1937
1938 pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
1939 self,
1940 other: &L2,
1941 ) -> Stream<CoreType, L2, Unbounded, Order::Min>
1942 where
1943 L::Root: CanSend<'a, L2, In<CoreType> = T, Out<CoreType> = (Tag, CoreType)>,
1944 CoreType: Serialize + DeserializeOwned,
1945 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1946 {
1947 self.send_bincode::<L2, CoreType>(other).map(q!(|(_, b)| b))
1948 }
1949
1950 pub fn send_bytes_anonymous<L2: Location<'a>, Tag>(
1951 self,
1952 other: &L2,
1953 ) -> Stream<Bytes, L2, Unbounded, Order::Min>
1954 where
1955 L::Root: CanSend<'a, L2, In<Bytes> = T, Out<Bytes> = (Tag, Bytes)>,
1956 Order: MinOrder<<L::Root as CanSend<'a, L2>>::OutStrongestOrder<Order>>,
1957 {
1958 self.send_bytes::<L2>(other).map(q!(|(_, b)| b))
1959 }
1960
1961 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
1962 pub fn broadcast_bincode<C2: 'a>(
1963 self,
1964 other: &Cluster<'a, C2>,
1965 ) -> Stream<
1966 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
1967 Cluster<'a, C2>,
1968 Unbounded,
1969 Order::Min,
1970 >
1971 where
1972 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
1973 T: Clone + Serialize + DeserializeOwned,
1974 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
1975 {
1976 let ids = other.members();
1977
1978 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
1979 ::std::clone::Clone::clone(id),
1980 ::std::clone::Clone::clone(&b)
1981 ))))
1982 .send_bincode(other)
1983 }
1984
1985 pub fn broadcast_bincode_anonymous<C2: 'a, Tag>(
1986 self,
1987 other: &Cluster<'a, C2>,
1988 ) -> Stream<T, Cluster<'a, C2>, Unbounded, Order::Min>
1989 where
1990 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
1991 T: Clone + Serialize + DeserializeOwned,
1992 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
1993 {
1994 self.broadcast_bincode(other).map(q!(|(_, b)| b))
1995 }
1996
1997 #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")]
1998 pub fn broadcast_bytes<C2: 'a>(
1999 self,
2000 other: &Cluster<'a, C2>,
2001 ) -> Stream<
2002 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2003 Cluster<'a, C2>,
2004 Unbounded,
2005 Order::Min,
2006 >
2007 where
2008 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2009 T: Clone,
2010 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2011 {
2012 let ids = other.members();
2013
2014 self.flat_map_ordered(q!(|b| ids.iter().map(move |id| (
2015 ::std::clone::Clone::clone(id),
2016 ::std::clone::Clone::clone(&b)
2017 ))))
2018 .send_bytes(other)
2019 }
2020
2021 pub fn broadcast_bytes_anonymous<C2: 'a, Tag>(
2022 self,
2023 other: &Cluster<'a, C2>,
2024 ) -> Stream<Bytes, Cluster<'a, C2>, Unbounded, Order::Min>
2025 where
2026 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2027 + 'a,
2028 T: Clone,
2029 Order: MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<Order>>,
2030 {
2031 self.broadcast_bytes(other).map(q!(|(_, b)| b))
2032 }
2033}
2034
2035#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")]
2036impl<'a, T, L: Location<'a> + NoTick, B> Stream<T, L, B, TotalOrder> {
2037 pub fn round_robin_bincode<C2: 'a>(
2038 self,
2039 other: &Cluster<'a, C2>,
2040 ) -> Stream<
2041 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<T>,
2042 Cluster<'a, C2>,
2043 Unbounded,
2044 <TotalOrder as MinOrder<
2045 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2046 >>::Min,
2047 >
2048 where
2049 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T)>,
2050 T: Clone + Serialize + DeserializeOwned,
2051 TotalOrder:
2052 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2053 {
2054 let ids = other.members();
2055
2056 self.enumerate()
2057 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2058 .send_bincode(other)
2059 }
2060
2061 pub fn round_robin_bincode_anonymous<C2: 'a, Tag>(
2062 self,
2063 other: &Cluster<'a, C2>,
2064 ) -> Stream<
2065 T,
2066 Cluster<'a, C2>,
2067 Unbounded,
2068 <TotalOrder as MinOrder<
2069 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2070 >>::Min,
2071 >
2072 where
2073 L::Root: CanSend<'a, Cluster<'a, C2>, In<T> = (ClusterId<C2>, T), Out<T> = (Tag, T)> + 'a,
2074 T: Clone + Serialize + DeserializeOwned,
2075 TotalOrder:
2076 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2077 {
2078 self.round_robin_bincode(other).map(q!(|(_, b)| b))
2079 }
2080
2081 pub fn round_robin_bytes<C2: 'a>(
2082 self,
2083 other: &Cluster<'a, C2>,
2084 ) -> Stream<
2085 <L::Root as CanSend<'a, Cluster<'a, C2>>>::Out<Bytes>,
2086 Cluster<'a, C2>,
2087 Unbounded,
2088 <TotalOrder as MinOrder<
2089 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2090 >>::Min,
2091 >
2092 where
2093 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T)> + 'a,
2094 T: Clone,
2095 TotalOrder:
2096 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2097 {
2098 let ids = other.members();
2099
2100 self.enumerate()
2101 .map(q!(|(i, w)| (ids[i % ids.len()], w)))
2102 .send_bytes(other)
2103 }
2104
2105 pub fn round_robin_bytes_anonymous<C2: 'a, Tag>(
2106 self,
2107 other: &Cluster<'a, C2>,
2108 ) -> Stream<
2109 Bytes,
2110 Cluster<'a, C2>,
2111 Unbounded,
2112 <TotalOrder as MinOrder<
2113 <L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>,
2114 >>::Min,
2115 >
2116 where
2117 L::Root: CanSend<'a, Cluster<'a, C2>, In<Bytes> = (ClusterId<C2>, T), Out<Bytes> = (Tag, Bytes)>
2118 + 'a,
2119 T: Clone,
2120 TotalOrder:
2121 MinOrder<<L::Root as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder<TotalOrder>>,
2122 {
2123 self.round_robin_bytes(other).map(q!(|(_, b)| b))
2124 }
2125}
2126
2127#[cfg(test)]
2128mod tests {
2129 use dfir_rs::futures::StreamExt;
2130 use hydro_deploy::Deployment;
2131 use serde::{Deserialize, Serialize};
2132 use stageleft::q;
2133
2134 use crate::FlowBuilder;
2135 use crate::location::Location;
2136
2137 struct P1 {}
2138 struct P2 {}
2139
2140 #[derive(Serialize, Deserialize, Debug)]
2141 struct SendOverNetwork {
2142 n: u32,
2143 }
2144
2145 #[tokio::test]
2146 async fn first_ten_distributed() {
2147 let mut deployment = Deployment::new();
2148
2149 let flow = FlowBuilder::new();
2150 let first_node = flow.process::<P1>();
2151 let second_node = flow.process::<P2>();
2152 let external = flow.external_process::<P2>();
2153
2154 let numbers = first_node.source_iter(q!(0..10));
2155 let out_port = numbers
2156 .map(q!(|n| SendOverNetwork { n }))
2157 .send_bincode(&second_node)
2158 .send_bincode_external(&external);
2159
2160 let nodes = flow
2161 .with_process(&first_node, deployment.Localhost())
2162 .with_process(&second_node, deployment.Localhost())
2163 .with_external(&external, deployment.Localhost())
2164 .deploy(&mut deployment);
2165
2166 deployment.deploy().await.unwrap();
2167
2168 let mut external_out = nodes.connect_source_bincode(out_port).await;
2169
2170 deployment.start().await.unwrap();
2171
2172 for i in 0..10 {
2173 assert_eq!(external_out.next().await.unwrap().n, i);
2174 }
2175 }
2176}