hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{
15 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode,
16};
17#[cfg(stageleft_runtime)]
18use crate::forward_handle::{CycleCollection, ReceiverComplete};
19use crate::forward_handle::{ForwardRef, TickCycle};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48 L: Location<'a>,
49{
50 fn defer_tick(self) -> Self {
51 Optional::defer_tick(self)
52 }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
56where
57 L: Location<'a>,
58{
59 type Location = Tick<L>;
60
61 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62 Optional::new(
63 location.clone(),
64 HydroNode::CycleSource {
65 ident,
66 metadata: location.new_node_metadata(Self::collection_kind()),
67 },
68 )
69 }
70}
71
72impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
73where
74 L: Location<'a>,
75{
76 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77 assert_eq!(
78 Location::id(&self.location),
79 expected_location,
80 "locations do not match"
81 );
82 self.location
83 .flow_state()
84 .borrow_mut()
85 .push_root(HydroRoot::CycleSink {
86 ident,
87 input: Box::new(self.ir_node.into_inner()),
88 op_metadata: HydroIrOpMetadata::new(),
89 });
90 }
91}
92
93impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 type Location = Tick<L>;
98
99 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
100 Optional::new(
101 location.clone(),
102 HydroNode::CycleSource {
103 ident,
104 metadata: location.new_node_metadata(Self::collection_kind()),
105 },
106 )
107 }
108}
109
110impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
111where
112 L: Location<'a>,
113{
114 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
115 assert_eq!(
116 Location::id(&self.location),
117 expected_location,
118 "locations do not match"
119 );
120 self.location
121 .flow_state()
122 .borrow_mut()
123 .push_root(HydroRoot::CycleSink {
124 ident,
125 input: Box::new(self.ir_node.into_inner()),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133 L: Location<'a> + NoTick,
134{
135 type Location = L;
136
137 fn create_source(ident: syn::Ident, location: L) -> Self {
138 Optional::new(
139 location.clone(),
140 HydroNode::CycleSource {
141 ident,
142 metadata: location.new_node_metadata(Self::collection_kind()),
143 },
144 )
145 }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150 L: Location<'a> + NoTick,
151{
152 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153 assert_eq!(
154 Location::id(&self.location),
155 expected_location,
156 "locations do not match"
157 );
158 self.location
159 .flow_state()
160 .borrow_mut()
161 .push_root(HydroRoot::CycleSink {
162 ident,
163 input: Box::new(self.ir_node.into_inner()),
164 op_metadata: HydroIrOpMetadata::new(),
165 });
166 }
167}
168
169impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
170where
171 L: Location<'a>,
172{
173 fn from(singleton: Optional<T, L, Bounded>) -> Self {
174 Optional::new(singleton.location, singleton.ir_node.into_inner())
175 }
176}
177
178impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
179where
180 L: Location<'a>,
181{
182 fn from(singleton: Singleton<T, L, B>) -> Self {
183 Optional::new(
184 singleton.location.clone(),
185 HydroNode::Cast {
186 inner: Box::new(singleton.ir_node.into_inner()),
187 metadata: singleton
188 .location
189 .new_node_metadata(Self::collection_kind()),
190 },
191 )
192 }
193}
194
195#[cfg(stageleft_runtime)]
196fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
197 me: Optional<T, L, B>,
198 other: Optional<O, L, B>,
199) -> Optional<(T, O), L, B> {
200 check_matching_location(&me.location, &other.location);
201
202 Optional::new(
203 me.location.clone(),
204 HydroNode::CrossSingleton {
205 left: Box::new(me.ir_node.into_inner()),
206 right: Box::new(other.ir_node.into_inner()),
207 metadata: me
208 .location
209 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
210 },
211 )
212}
213
214#[cfg(stageleft_runtime)]
215fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
216 me: Optional<T, L, B>,
217 other: Optional<T, L, B>,
218) -> Optional<T, L, B> {
219 check_matching_location(&me.location, &other.location);
220
221 Optional::new(
222 me.location.clone(),
223 HydroNode::ChainFirst {
224 first: Box::new(me.ir_node.into_inner()),
225 second: Box::new(other.ir_node.into_inner()),
226 metadata: me
227 .location
228 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
229 },
230 )
231}
232
233impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
234where
235 T: Clone,
236 L: Location<'a>,
237{
238 fn clone(&self) -> Self {
239 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
240 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
241 *self.ir_node.borrow_mut() = HydroNode::Tee {
242 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
243 metadata: self.location.new_node_metadata(Self::collection_kind()),
244 };
245 }
246
247 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
248 Optional {
249 location: self.location.clone(),
250 ir_node: HydroNode::Tee {
251 inner: TeeNode(inner.0.clone()),
252 metadata: metadata.clone(),
253 }
254 .into(),
255 _phantom: PhantomData,
256 }
257 } else {
258 unreachable!()
259 }
260 }
261}
262
263impl<'a, T, L, B: Boundedness> Optional<T, L, B>
264where
265 L: Location<'a>,
266{
267 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
268 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
269 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
270 Optional {
271 location,
272 ir_node: RefCell::new(ir_node),
273 _phantom: PhantomData,
274 }
275 }
276
277 pub(crate) fn collection_kind() -> CollectionKind {
278 CollectionKind::Optional {
279 bound: B::BOUND_KIND,
280 element_type: stageleft::quote_type::<T>().into(),
281 }
282 }
283
284 /// Transforms the optional value by applying a function `f` to it,
285 /// continuously as the input is updated.
286 ///
287 /// Whenever the optional is empty, the output optional is also empty.
288 ///
289 /// # Example
290 /// ```rust
291 /// # use hydro_lang::prelude::*;
292 /// # use futures::StreamExt;
293 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
294 /// let tick = process.tick();
295 /// let optional = tick.optional_first_tick(q!(1));
296 /// optional.map(q!(|v| v + 1)).all_ticks()
297 /// # }, |mut stream| async move {
298 /// // 2
299 /// # assert_eq!(stream.next().await.unwrap(), 2);
300 /// # }));
301 /// ```
302 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
303 where
304 F: Fn(T) -> U + 'a,
305 {
306 let f = f.splice_fn1_ctx(&self.location).into();
307 Optional::new(
308 self.location.clone(),
309 HydroNode::Map {
310 f,
311 input: Box::new(self.ir_node.into_inner()),
312 metadata: self
313 .location
314 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
315 },
316 )
317 }
318
319 /// Transforms the optional value by applying a function `f` to it and then flattening
320 /// the result into a stream, preserving the order of elements.
321 ///
322 /// If the optional is empty, the output stream is also empty. If the optional contains
323 /// a value, `f` is applied to produce an iterator, and all items from that iterator
324 /// are emitted in the output stream in deterministic order.
325 ///
326 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
327 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
328 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
329 ///
330 /// # Example
331 /// ```rust
332 /// # use hydro_lang::prelude::*;
333 /// # use futures::StreamExt;
334 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
335 /// let tick = process.tick();
336 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
337 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
338 /// # }, |mut stream| async move {
339 /// // 1, 2, 3
340 /// # for w in vec![1, 2, 3] {
341 /// # assert_eq!(stream.next().await.unwrap(), w);
342 /// # }
343 /// # }));
344 /// ```
345 pub fn flat_map_ordered<U, I, F>(
346 self,
347 f: impl IntoQuotedMut<'a, F, L>,
348 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
349 where
350 I: IntoIterator<Item = U>,
351 F: Fn(T) -> I + 'a,
352 {
353 let f = f.splice_fn1_ctx(&self.location).into();
354 Stream::new(
355 self.location.clone(),
356 HydroNode::FlatMap {
357 f,
358 input: Box::new(self.ir_node.into_inner()),
359 metadata: self.location.new_node_metadata(
360 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
361 ),
362 },
363 )
364 }
365
366 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
367 /// for the output type `I` to produce items in any order.
368 ///
369 /// If the optional is empty, the output stream is also empty. If the optional contains
370 /// a value, `f` is applied to produce an iterator, and all items from that iterator
371 /// are emitted in the output stream in non-deterministic order.
372 ///
373 /// # Example
374 /// ```rust
375 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
376 /// # use futures::StreamExt;
377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
378 /// let tick = process.tick();
379 /// let optional = tick.optional_first_tick(q!(
380 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
381 /// ));
382 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
383 /// # }, |mut stream| async move {
384 /// // 1, 2, 3, but in no particular order
385 /// # let mut results = Vec::new();
386 /// # for _ in 0..3 {
387 /// # results.push(stream.next().await.unwrap());
388 /// # }
389 /// # results.sort();
390 /// # assert_eq!(results, vec![1, 2, 3]);
391 /// # }));
392 /// ```
393 pub fn flat_map_unordered<U, I, F>(
394 self,
395 f: impl IntoQuotedMut<'a, F, L>,
396 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
397 where
398 I: IntoIterator<Item = U>,
399 F: Fn(T) -> I + 'a,
400 {
401 let f = f.splice_fn1_ctx(&self.location).into();
402 Stream::new(
403 self.location.clone(),
404 HydroNode::FlatMap {
405 f,
406 input: Box::new(self.ir_node.into_inner()),
407 metadata: self
408 .location
409 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
410 },
411 )
412 }
413
414 /// Flattens the optional value into a stream, preserving the order of elements.
415 ///
416 /// If the optional is empty, the output stream is also empty. If the optional contains
417 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
418 /// in the output stream in deterministic order.
419 ///
420 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
421 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
422 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
423 ///
424 /// # Example
425 /// ```rust
426 /// # use hydro_lang::prelude::*;
427 /// # use futures::StreamExt;
428 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
429 /// let tick = process.tick();
430 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
431 /// optional.flatten_ordered().all_ticks()
432 /// # }, |mut stream| async move {
433 /// // 1, 2, 3
434 /// # for w in vec![1, 2, 3] {
435 /// # assert_eq!(stream.next().await.unwrap(), w);
436 /// # }
437 /// # }));
438 /// ```
439 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
440 where
441 T: IntoIterator<Item = U>,
442 {
443 self.flat_map_ordered(q!(|v| v))
444 }
445
446 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
447 /// for the element type `T` to produce items in any order.
448 ///
449 /// If the optional is empty, the output stream is also empty. If the optional contains
450 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
451 /// in the output stream in non-deterministic order.
452 ///
453 /// # Example
454 /// ```rust
455 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
456 /// # use futures::StreamExt;
457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
458 /// let tick = process.tick();
459 /// let optional = tick.optional_first_tick(q!(
460 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
461 /// ));
462 /// optional.flatten_unordered().all_ticks()
463 /// # }, |mut stream| async move {
464 /// // 1, 2, 3, but in no particular order
465 /// # let mut results = Vec::new();
466 /// # for _ in 0..3 {
467 /// # results.push(stream.next().await.unwrap());
468 /// # }
469 /// # results.sort();
470 /// # assert_eq!(results, vec![1, 2, 3]);
471 /// # }));
472 /// ```
473 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
474 where
475 T: IntoIterator<Item = U>,
476 {
477 self.flat_map_unordered(q!(|v| v))
478 }
479
480 /// Creates an optional containing only the value if it satisfies a predicate `f`.
481 ///
482 /// If the optional is empty, the output optional is also empty. If the optional contains
483 /// a value and the predicate returns `true`, the output optional contains the same value.
484 /// If the predicate returns `false`, the output optional is empty.
485 ///
486 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
487 /// not modify or take ownership of the value. If you need to modify the value while filtering
488 /// use [`Optional::filter_map`] instead.
489 ///
490 /// # Example
491 /// ```rust
492 /// # use hydro_lang::prelude::*;
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495 /// let tick = process.tick();
496 /// let optional = tick.optional_first_tick(q!(5));
497 /// optional.filter(q!(|&x| x > 3)).all_ticks()
498 /// # }, |mut stream| async move {
499 /// // 5
500 /// # assert_eq!(stream.next().await.unwrap(), 5);
501 /// # }));
502 /// ```
503 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
504 where
505 F: Fn(&T) -> bool + 'a,
506 {
507 let f = f.splice_fn1_borrow_ctx(&self.location).into();
508 Optional::new(
509 self.location.clone(),
510 HydroNode::Filter {
511 f,
512 input: Box::new(self.ir_node.into_inner()),
513 metadata: self.location.new_node_metadata(Self::collection_kind()),
514 },
515 )
516 }
517
518 /// An operator that both filters and maps. It yields only the value if the supplied
519 /// closure `f` returns `Some(value)`.
520 ///
521 /// If the optional is empty, the output optional is also empty. If the optional contains
522 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
523 /// If the closure returns `None`, the output optional is empty.
524 ///
525 /// # Example
526 /// ```rust
527 /// # use hydro_lang::prelude::*;
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530 /// let tick = process.tick();
531 /// let optional = tick.optional_first_tick(q!("42"));
532 /// optional
533 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
534 /// .all_ticks()
535 /// # }, |mut stream| async move {
536 /// // 42
537 /// # assert_eq!(stream.next().await.unwrap(), 42);
538 /// # }));
539 /// ```
540 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
541 where
542 F: Fn(T) -> Option<U> + 'a,
543 {
544 let f = f.splice_fn1_ctx(&self.location).into();
545 Optional::new(
546 self.location.clone(),
547 HydroNode::FilterMap {
548 f,
549 input: Box::new(self.ir_node.into_inner()),
550 metadata: self
551 .location
552 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
553 },
554 )
555 }
556
557 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
558 ///
559 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
560 /// non-null. This is useful for combining several pieces of state together.
561 ///
562 /// # Example
563 /// ```rust
564 /// # use hydro_lang::prelude::*;
565 /// # use futures::StreamExt;
566 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
567 /// let tick = process.tick();
568 /// let numbers = process
569 /// .source_iter(q!(vec![123, 456, 789]))
570 /// .batch(&tick, nondet!(/** test */));
571 /// let min = numbers.clone().min(); // Optional
572 /// let max = numbers.max(); // Optional
573 /// min.zip(max).all_ticks()
574 /// # }, |mut stream| async move {
575 /// // [(123, 789)]
576 /// # for w in vec![(123, 789)] {
577 /// # assert_eq!(stream.next().await.unwrap(), w);
578 /// # }
579 /// # }));
580 /// ```
581 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
582 where
583 O: Clone,
584 {
585 let other: Optional<O, L, B> = other.into();
586 check_matching_location(&self.location, &other.location);
587
588 if L::is_top_level()
589 && let Some(tick) = self.location.try_tick()
590 {
591 let out = zip_inside_tick(
592 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
593 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
594 )
595 .latest();
596
597 Optional::new(out.location, out.ir_node.into_inner())
598 } else {
599 zip_inside_tick(self, other)
600 }
601 }
602
603 /// Passes through `self` when it has a value, otherwise passes through `other`.
604 ///
605 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
606 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
607 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
608 ///
609 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
610 /// of the inputs change (including to/from null states).
611 ///
612 /// # Example
613 /// ```rust
614 /// # use hydro_lang::prelude::*;
615 /// # use futures::StreamExt;
616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617 /// let tick = process.tick();
618 /// // ticks are lazy by default, forces the second tick to run
619 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620 ///
621 /// let some_first_tick = tick.optional_first_tick(q!(123));
622 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
623 /// some_first_tick.or(some_second_tick).all_ticks()
624 /// # }, |mut stream| async move {
625 /// // [123 /* first tick */, 456 /* second tick */]
626 /// # for w in vec![123, 456] {
627 /// # assert_eq!(stream.next().await.unwrap(), w);
628 /// # }
629 /// # }));
630 /// ```
631 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
632 check_matching_location(&self.location, &other.location);
633
634 if L::is_top_level()
635 && let Some(tick) = self.location.try_tick()
636 {
637 let out = or_inside_tick(
638 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640 )
641 .latest();
642
643 Optional::new(out.location, out.ir_node.into_inner())
644 } else {
645 Optional::new(
646 self.location.clone(),
647 HydroNode::ChainFirst {
648 first: Box::new(self.ir_node.into_inner()),
649 second: Box::new(other.ir_node.into_inner()),
650 metadata: self.location.new_node_metadata(Self::collection_kind()),
651 },
652 )
653 }
654 }
655
656 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
657 ///
658 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
659 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
660 ///
661 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
662 /// of the inputs change (including to/from null states).
663 ///
664 /// # Example
665 /// ```rust
666 /// # use hydro_lang::prelude::*;
667 /// # use futures::StreamExt;
668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669 /// let tick = process.tick();
670 /// // ticks are lazy by default, forces the later ticks to run
671 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672 ///
673 /// let some_first_tick = tick.optional_first_tick(q!(123));
674 /// some_first_tick
675 /// .unwrap_or(tick.singleton(q!(456)))
676 /// .all_ticks()
677 /// # }, |mut stream| async move {
678 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
679 /// # for w in vec![123, 456, 456, 456] {
680 /// # assert_eq!(stream.next().await.unwrap(), w);
681 /// # }
682 /// # }));
683 /// ```
684 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
685 let res_option = self.or(other.into());
686 Singleton::new(
687 res_option.location.clone(),
688 HydroNode::Cast {
689 inner: Box::new(res_option.ir_node.into_inner()),
690 metadata: res_option
691 .location
692 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
693 },
694 )
695 }
696
697 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
698 ///
699 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
700 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
701 /// so that Hydro can skip any computation on null values.
702 ///
703 /// # Example
704 /// ```rust
705 /// # use hydro_lang::prelude::*;
706 /// # use futures::StreamExt;
707 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
708 /// let tick = process.tick();
709 /// // ticks are lazy by default, forces the later ticks to run
710 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
711 ///
712 /// let some_first_tick = tick.optional_first_tick(q!(123));
713 /// some_first_tick.into_singleton().all_ticks()
714 /// # }, |mut stream| async move {
715 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
716 /// # for w in vec![Some(123), None, None, None] {
717 /// # assert_eq!(stream.next().await.unwrap(), w);
718 /// # }
719 /// # }));
720 /// ```
721 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
722 where
723 T: Clone,
724 {
725 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
726 let core_ir = HydroNode::Source {
727 source: HydroSource::Iter(none.into()),
728 metadata: self
729 .location
730 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
731 };
732
733 let none_singleton = if L::is_top_level() {
734 Singleton::new(
735 self.location.clone(),
736 HydroNode::Persist {
737 inner: Box::new(core_ir),
738 metadata: self
739 .location
740 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
741 },
742 )
743 } else {
744 Singleton::new(self.location.clone(), core_ir)
745 };
746
747 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
748 }
749
750 /// An operator which allows you to "name" a `HydroNode`.
751 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
752 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
753 {
754 let mut node = self.ir_node.borrow_mut();
755 let metadata = node.metadata_mut();
756 metadata.tag = Some(name.to_string());
757 }
758 self
759 }
760}
761
762impl<'a, T, L> Optional<T, L, Bounded>
763where
764 L: Location<'a>,
765{
766 /// Filters this optional, passing through the optional value if it is non-null **and** the
767 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
768 ///
769 /// Useful for conditionally processing, such as only emitting an optional's value outside
770 /// a tick if some other condition is satisfied.
771 ///
772 /// # Example
773 /// ```rust
774 /// # use hydro_lang::prelude::*;
775 /// # use futures::StreamExt;
776 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
777 /// let tick = process.tick();
778 /// // ticks are lazy by default, forces the second tick to run
779 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
780 ///
781 /// let batch_first_tick = process
782 /// .source_iter(q!(vec![]))
783 /// .batch(&tick, nondet!(/** test */));
784 /// let batch_second_tick = process
785 /// .source_iter(q!(vec![456]))
786 /// .batch(&tick, nondet!(/** test */))
787 /// .defer_tick(); // appears on the second tick
788 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
789 /// batch_first_tick.chain(batch_second_tick).first()
790 /// .filter_if_some(some_on_first_tick)
791 /// .unwrap_or(tick.singleton(q!(789)))
792 /// .all_ticks()
793 /// # }, |mut stream| async move {
794 /// // [789, 789]
795 /// # for w in vec![789, 789] {
796 /// # assert_eq!(stream.next().await.unwrap(), w);
797 /// # }
798 /// # }));
799 /// ```
800 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
801 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
802 }
803
804 /// Filters this optional, passing through the optional value if it is non-null **and** the
805 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
806 ///
807 /// Useful for conditionally processing, such as only emitting an optional's value outside
808 /// a tick if some other condition is satisfied.
809 ///
810 /// # Example
811 /// ```rust
812 /// # use hydro_lang::prelude::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815 /// let tick = process.tick();
816 /// // ticks are lazy by default, forces the second tick to run
817 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818 ///
819 /// let batch_first_tick = process
820 /// .source_iter(q!(vec![]))
821 /// .batch(&tick, nondet!(/** test */));
822 /// let batch_second_tick = process
823 /// .source_iter(q!(vec![456]))
824 /// .batch(&tick, nondet!(/** test */))
825 /// .defer_tick(); // appears on the second tick
826 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
827 /// batch_first_tick.chain(batch_second_tick).first()
828 /// .filter_if_none(some_on_first_tick)
829 /// .unwrap_or(tick.singleton(q!(789)))
830 /// .all_ticks()
831 /// # }, |mut stream| async move {
832 /// // [789, 789]
833 /// # for w in vec![789, 456] {
834 /// # assert_eq!(stream.next().await.unwrap(), w);
835 /// # }
836 /// # }));
837 /// ```
838 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
839 self.filter_if_some(
840 other
841 .map(q!(|_| ()))
842 .into_singleton()
843 .filter(q!(|o| o.is_none())),
844 )
845 }
846
847 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
848 ///
849 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
850 /// having a value, such as only releasing a piece of state if the node is the leader.
851 ///
852 /// # Example
853 /// ```rust
854 /// # use hydro_lang::prelude::*;
855 /// # use futures::StreamExt;
856 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857 /// let tick = process.tick();
858 /// // ticks are lazy by default, forces the second tick to run
859 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
860 ///
861 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
862 /// some_on_first_tick
863 /// .if_some_then(tick.singleton(q!(456)))
864 /// .unwrap_or(tick.singleton(q!(123)))
865 /// # .all_ticks()
866 /// # }, |mut stream| async move {
867 /// // 456 (first tick) ~> 123 (second tick onwards)
868 /// # for w in vec![456, 123, 123] {
869 /// # assert_eq!(stream.next().await.unwrap(), w);
870 /// # }
871 /// # }));
872 /// ```
873 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
874 value.filter_if_some(self)
875 }
876}
877
878impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
879where
880 L: Location<'a> + NoTick,
881{
882 /// Returns an optional value corresponding to the latest snapshot of the optional
883 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
884 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
885 /// all snapshots of this optional into the atomic-associated tick will observe the
886 /// same value each tick.
887 ///
888 /// # Non-Determinism
889 /// Because this picks a snapshot of a optional whose value is continuously changing,
890 /// the output optional has a non-deterministic value since the snapshot can be at an
891 /// arbitrary point in time.
892 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
893 Optional::new(
894 self.location.clone().tick,
895 HydroNode::Batch {
896 inner: Box::new(self.ir_node.into_inner()),
897 metadata: self
898 .location
899 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
900 },
901 )
902 }
903
904 /// Returns this optional back into a top-level, asynchronous execution context where updates
905 /// to the value will be asynchronously propagated.
906 pub fn end_atomic(self) -> Optional<T, L, B> {
907 Optional::new(
908 self.location.tick.l.clone(),
909 HydroNode::EndAtomic {
910 inner: Box::new(self.ir_node.into_inner()),
911 metadata: self
912 .location
913 .tick
914 .l
915 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
916 },
917 )
918 }
919}
920
921impl<'a, T, L, B: Boundedness> Optional<T, L, B>
922where
923 L: Location<'a>,
924{
925 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
926 /// will observe the same version of the value and will be executed synchronously before any
927 /// outputs are yielded (in [`Optional::end_atomic`]).
928 ///
929 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
930 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
931 /// a different version).
932 ///
933 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
934 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
935 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
936 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
937 let out_location = Atomic { tick: tick.clone() };
938 Optional::new(
939 out_location.clone(),
940 HydroNode::BeginAtomic {
941 inner: Box::new(self.ir_node.into_inner()),
942 metadata: out_location
943 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
944 },
945 )
946 }
947
948 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
949 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
950 /// relevant data that contributed to the snapshot at tick `t`.
951 ///
952 /// # Non-Determinism
953 /// Because this picks a snapshot of a optional whose value is continuously changing,
954 /// the output optional has a non-deterministic value since the snapshot can be at an
955 /// arbitrary point in time.
956 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
957 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
958 Optional::new(
959 tick.clone(),
960 HydroNode::Batch {
961 inner: Box::new(self.ir_node.into_inner()),
962 metadata: tick
963 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
964 },
965 )
966 }
967
968 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
969 /// with order corresponding to increasing prefixes of data contributing to the optional.
970 ///
971 /// # Non-Determinism
972 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
973 /// to non-deterministic batching and arrival of inputs, the output stream is
974 /// non-deterministic.
975 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
976 where
977 L: NoTick,
978 {
979 let tick = self.location.tick();
980 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
981 }
982
983 /// Given a time interval, returns a stream corresponding to snapshots of the optional
984 /// value taken at various points in time. Because the input optional may be
985 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
986 /// represent the value of the optional given some prefix of the streams leading up to
987 /// it.
988 ///
989 /// # Non-Determinism
990 /// The output stream is non-deterministic in which elements are sampled, since this
991 /// is controlled by a clock.
992 pub fn sample_every(
993 self,
994 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
995 nondet: NonDet,
996 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
997 where
998 L: NoTick + NoAtomic,
999 {
1000 let samples = self.location.source_interval(interval, nondet);
1001 let tick = self.location.tick();
1002
1003 self.snapshot(&tick, nondet)
1004 .filter_if_some(samples.batch(&tick, nondet).first())
1005 .all_ticks()
1006 .weakest_retries()
1007 }
1008}
1009
1010impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1011where
1012 L: Location<'a>,
1013{
1014 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1015 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1016 /// null values).
1017 ///
1018 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1019 /// producing one element in the output for each (non-null) tick. This is useful for batched
1020 /// computations, where the results from each tick must be combined together.
1021 ///
1022 /// # Example
1023 /// ```rust
1024 /// # use hydro_lang::prelude::*;
1025 /// # use futures::StreamExt;
1026 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1027 /// # let tick = process.tick();
1028 /// # // ticks are lazy by default, forces the second tick to run
1029 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1030 /// # let batch_first_tick = process
1031 /// # .source_iter(q!(vec![]))
1032 /// # .batch(&tick, nondet!(/** test */));
1033 /// # let batch_second_tick = process
1034 /// # .source_iter(q!(vec![1, 2, 3]))
1035 /// # .batch(&tick, nondet!(/** test */))
1036 /// # .defer_tick(); // appears on the second tick
1037 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1038 /// input_batch // first tick: [], second tick: [1, 2, 3]
1039 /// .max()
1040 /// .all_ticks()
1041 /// # }, |mut stream| async move {
1042 /// // [3]
1043 /// # for w in vec![3] {
1044 /// # assert_eq!(stream.next().await.unwrap(), w);
1045 /// # }
1046 /// # }));
1047 /// ```
1048 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1049 self.into_stream().all_ticks()
1050 }
1051
1052 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1053 /// which will stream the value computed in _each_ tick as a separate stream element.
1054 ///
1055 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1056 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1057 /// optional's [`Tick`] context.
1058 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1059 self.into_stream().all_ticks_atomic()
1060 }
1061
1062 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1063 /// be asynchronously updated with the latest value of the optional inside the tick, including
1064 /// whether the optional is null or not.
1065 ///
1066 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1067 /// tick that tracks the inner value. This is useful for getting the value as of the
1068 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1069 ///
1070 /// # Example
1071 /// ```rust
1072 /// # use hydro_lang::prelude::*;
1073 /// # use futures::StreamExt;
1074 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1075 /// # let tick = process.tick();
1076 /// # // ticks are lazy by default, forces the second tick to run
1077 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1078 /// # let batch_first_tick = process
1079 /// # .source_iter(q!(vec![]))
1080 /// # .batch(&tick, nondet!(/** test */));
1081 /// # let batch_second_tick = process
1082 /// # .source_iter(q!(vec![1, 2, 3]))
1083 /// # .batch(&tick, nondet!(/** test */))
1084 /// # .defer_tick(); // appears on the second tick
1085 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1086 /// input_batch // first tick: [], second tick: [1, 2, 3]
1087 /// .max()
1088 /// .latest()
1089 /// # .into_singleton()
1090 /// # .sample_eager(nondet!(/** test */))
1091 /// # }, |mut stream| async move {
1092 /// // asynchronously changes from None ~> 3
1093 /// # for w in vec![None, Some(3)] {
1094 /// # assert_eq!(stream.next().await.unwrap(), w);
1095 /// # }
1096 /// # }));
1097 /// ```
1098 pub fn latest(self) -> Optional<T, L, Unbounded> {
1099 Optional::new(
1100 self.location.outer().clone(),
1101 HydroNode::YieldConcat {
1102 inner: Box::new(self.ir_node.into_inner()),
1103 metadata: self
1104 .location
1105 .outer()
1106 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1107 },
1108 )
1109 }
1110
1111 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1112 /// be updated with the latest value of the optional inside the tick.
1113 ///
1114 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1115 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1116 /// optional's [`Tick`] context.
1117 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1118 let out_location = Atomic {
1119 tick: self.location.clone(),
1120 };
1121
1122 Optional::new(
1123 out_location.clone(),
1124 HydroNode::YieldConcat {
1125 inner: Box::new(self.ir_node.into_inner()),
1126 metadata: out_location
1127 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1128 },
1129 )
1130 }
1131
1132 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1133 /// always has the state of `self` at tick `T - 1`.
1134 ///
1135 /// At tick `0`, the output optional is null, since there is no previous tick.
1136 ///
1137 /// This operator enables stateful iterative processing with ticks, by sending data from one
1138 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1139 ///
1140 /// # Example
1141 /// ```rust
1142 /// # use hydro_lang::prelude::*;
1143 /// # use futures::StreamExt;
1144 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1145 /// let tick = process.tick();
1146 /// // ticks are lazy by default, forces the second tick to run
1147 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1148 ///
1149 /// let batch_first_tick = process
1150 /// .source_iter(q!(vec![1, 2]))
1151 /// .batch(&tick, nondet!(/** test */));
1152 /// let batch_second_tick = process
1153 /// .source_iter(q!(vec![3, 4]))
1154 /// .batch(&tick, nondet!(/** test */))
1155 /// .defer_tick(); // appears on the second tick
1156 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1157 /// .reduce(q!(|state, v| *state += v));
1158 ///
1159 /// current_tick_sum.clone().into_singleton().zip(
1160 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1161 /// ).all_ticks()
1162 /// # }, |mut stream| async move {
1163 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1164 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1165 /// # assert_eq!(stream.next().await.unwrap(), w);
1166 /// # }
1167 /// # }));
1168 /// ```
1169 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1170 Optional::new(
1171 self.location.clone(),
1172 HydroNode::DeferTick {
1173 input: Box::new(self.ir_node.into_inner()),
1174 metadata: self.location.new_node_metadata(Self::collection_kind()),
1175 },
1176 )
1177 }
1178
1179 #[deprecated(note = "use .into_stream().persist()")]
1180 #[expect(missing_docs, reason = "deprecated")]
1181 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1182 where
1183 T: Clone,
1184 {
1185 self.into_stream().persist()
1186 }
1187
1188 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1189 /// non-null. Otherwise, the stream is empty.
1190 ///
1191 /// # Example
1192 /// ```rust
1193 /// # use hydro_lang::prelude::*;
1194 /// # use futures::StreamExt;
1195 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1196 /// # let tick = process.tick();
1197 /// # // ticks are lazy by default, forces the second tick to run
1198 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1199 /// # let batch_first_tick = process
1200 /// # .source_iter(q!(vec![]))
1201 /// # .batch(&tick, nondet!(/** test */));
1202 /// # let batch_second_tick = process
1203 /// # .source_iter(q!(vec![123, 456]))
1204 /// # .batch(&tick, nondet!(/** test */))
1205 /// # .defer_tick(); // appears on the second tick
1206 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1207 /// input_batch // first tick: [], second tick: [123, 456]
1208 /// .clone()
1209 /// .max()
1210 /// .into_stream()
1211 /// .chain(input_batch)
1212 /// .all_ticks()
1213 /// # }, |mut stream| async move {
1214 /// // [456, 123, 456]
1215 /// # for w in vec![456, 123, 456] {
1216 /// # assert_eq!(stream.next().await.unwrap(), w);
1217 /// # }
1218 /// # }));
1219 /// ```
1220 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1221 Stream::new(
1222 self.location.clone(),
1223 HydroNode::Cast {
1224 inner: Box::new(self.ir_node.into_inner()),
1225 metadata: self.location.new_node_metadata(Stream::<
1226 T,
1227 Tick<L>,
1228 Bounded,
1229 TotalOrder,
1230 ExactlyOnce,
1231 >::collection_kind()),
1232 },
1233 )
1234 }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239 use futures::StreamExt;
1240 use hydro_deploy::Deployment;
1241 use stageleft::q;
1242
1243 use super::Optional;
1244 use crate::compile::builder::FlowBuilder;
1245 use crate::location::Location;
1246
1247 #[tokio::test]
1248 async fn optional_or_cardinality() {
1249 let mut deployment = Deployment::new();
1250
1251 let flow = FlowBuilder::new();
1252 let node = flow.process::<()>();
1253 let external = flow.external::<()>();
1254
1255 let node_tick = node.tick();
1256 let tick_singleton = node_tick.singleton(q!(123));
1257 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1258 let counts = tick_optional_inhabited
1259 .clone()
1260 .or(tick_optional_inhabited)
1261 .into_stream()
1262 .count()
1263 .all_ticks()
1264 .send_bincode_external(&external);
1265
1266 let nodes = flow
1267 .with_process(&node, deployment.Localhost())
1268 .with_external(&external, deployment.Localhost())
1269 .deploy(&mut deployment);
1270
1271 deployment.deploy().await.unwrap();
1272
1273 let mut external_out = nodes.connect(counts).await;
1274
1275 deployment.start().await.unwrap();
1276
1277 assert_eq!(external_out.next().await.unwrap(), 1);
1278 }
1279}