hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{
15    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode,
16};
17#[cfg(stageleft_runtime)]
18use crate::forward_handle::{CycleCollection, ReceiverComplete};
19use crate::forward_handle::{ForwardRef, TickCycle};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48    L: Location<'a>,
49{
50    fn defer_tick(self) -> Self {
51        Optional::defer_tick(self)
52    }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
56where
57    L: Location<'a>,
58{
59    type Location = Tick<L>;
60
61    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62        Optional::new(
63            location.clone(),
64            HydroNode::CycleSource {
65                ident,
66                metadata: location.new_node_metadata(Self::collection_kind()),
67            },
68        )
69    }
70}
71
72impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
73where
74    L: Location<'a>,
75{
76    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
77        assert_eq!(
78            Location::id(&self.location),
79            expected_location,
80            "locations do not match"
81        );
82        self.location
83            .flow_state()
84            .borrow_mut()
85            .push_root(HydroRoot::CycleSink {
86                ident,
87                input: Box::new(self.ir_node.into_inner()),
88                op_metadata: HydroIrOpMetadata::new(),
89            });
90    }
91}
92
93impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    type Location = Tick<L>;
98
99    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
100        Optional::new(
101            location.clone(),
102            HydroNode::CycleSource {
103                ident,
104                metadata: location.new_node_metadata(Self::collection_kind()),
105            },
106        )
107    }
108}
109
110impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
111where
112    L: Location<'a>,
113{
114    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
115        assert_eq!(
116            Location::id(&self.location),
117            expected_location,
118            "locations do not match"
119        );
120        self.location
121            .flow_state()
122            .borrow_mut()
123            .push_root(HydroRoot::CycleSink {
124                ident,
125                input: Box::new(self.ir_node.into_inner()),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133    L: Location<'a> + NoTick,
134{
135    type Location = L;
136
137    fn create_source(ident: syn::Ident, location: L) -> Self {
138        Optional::new(
139            location.clone(),
140            HydroNode::CycleSource {
141                ident,
142                metadata: location.new_node_metadata(Self::collection_kind()),
143            },
144        )
145    }
146}
147
148impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
149where
150    L: Location<'a> + NoTick,
151{
152    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
153        assert_eq!(
154            Location::id(&self.location),
155            expected_location,
156            "locations do not match"
157        );
158        self.location
159            .flow_state()
160            .borrow_mut()
161            .push_root(HydroRoot::CycleSink {
162                ident,
163                input: Box::new(self.ir_node.into_inner()),
164                op_metadata: HydroIrOpMetadata::new(),
165            });
166    }
167}
168
169impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
170where
171    L: Location<'a>,
172{
173    fn from(singleton: Optional<T, L, Bounded>) -> Self {
174        Optional::new(singleton.location, singleton.ir_node.into_inner())
175    }
176}
177
178impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
179where
180    L: Location<'a>,
181{
182    fn from(singleton: Singleton<T, L, B>) -> Self {
183        Optional::new(
184            singleton.location.clone(),
185            HydroNode::Cast {
186                inner: Box::new(singleton.ir_node.into_inner()),
187                metadata: singleton
188                    .location
189                    .new_node_metadata(Self::collection_kind()),
190            },
191        )
192    }
193}
194
195#[cfg(stageleft_runtime)]
196fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
197    me: Optional<T, L, B>,
198    other: Optional<O, L, B>,
199) -> Optional<(T, O), L, B> {
200    check_matching_location(&me.location, &other.location);
201
202    Optional::new(
203        me.location.clone(),
204        HydroNode::CrossSingleton {
205            left: Box::new(me.ir_node.into_inner()),
206            right: Box::new(other.ir_node.into_inner()),
207            metadata: me
208                .location
209                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
210        },
211    )
212}
213
214#[cfg(stageleft_runtime)]
215fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
216    me: Optional<T, L, B>,
217    other: Optional<T, L, B>,
218) -> Optional<T, L, B> {
219    check_matching_location(&me.location, &other.location);
220
221    Optional::new(
222        me.location.clone(),
223        HydroNode::ChainFirst {
224            first: Box::new(me.ir_node.into_inner()),
225            second: Box::new(other.ir_node.into_inner()),
226            metadata: me
227                .location
228                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
229        },
230    )
231}
232
233impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
234where
235    T: Clone,
236    L: Location<'a>,
237{
238    fn clone(&self) -> Self {
239        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
240            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
241            *self.ir_node.borrow_mut() = HydroNode::Tee {
242                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
243                metadata: self.location.new_node_metadata(Self::collection_kind()),
244            };
245        }
246
247        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
248            Optional {
249                location: self.location.clone(),
250                ir_node: HydroNode::Tee {
251                    inner: TeeNode(inner.0.clone()),
252                    metadata: metadata.clone(),
253                }
254                .into(),
255                _phantom: PhantomData,
256            }
257        } else {
258            unreachable!()
259        }
260    }
261}
262
263impl<'a, T, L, B: Boundedness> Optional<T, L, B>
264where
265    L: Location<'a>,
266{
267    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
268        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
269        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
270        Optional {
271            location,
272            ir_node: RefCell::new(ir_node),
273            _phantom: PhantomData,
274        }
275    }
276
277    pub(crate) fn collection_kind() -> CollectionKind {
278        CollectionKind::Optional {
279            bound: B::BOUND_KIND,
280            element_type: stageleft::quote_type::<T>().into(),
281        }
282    }
283
284    /// Transforms the optional value by applying a function `f` to it,
285    /// continuously as the input is updated.
286    ///
287    /// Whenever the optional is empty, the output optional is also empty.
288    ///
289    /// # Example
290    /// ```rust
291    /// # use hydro_lang::prelude::*;
292    /// # use futures::StreamExt;
293    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
294    /// let tick = process.tick();
295    /// let optional = tick.optional_first_tick(q!(1));
296    /// optional.map(q!(|v| v + 1)).all_ticks()
297    /// # }, |mut stream| async move {
298    /// // 2
299    /// # assert_eq!(stream.next().await.unwrap(), 2);
300    /// # }));
301    /// ```
302    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
303    where
304        F: Fn(T) -> U + 'a,
305    {
306        let f = f.splice_fn1_ctx(&self.location).into();
307        Optional::new(
308            self.location.clone(),
309            HydroNode::Map {
310                f,
311                input: Box::new(self.ir_node.into_inner()),
312                metadata: self
313                    .location
314                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
315            },
316        )
317    }
318
319    /// Transforms the optional value by applying a function `f` to it and then flattening
320    /// the result into a stream, preserving the order of elements.
321    ///
322    /// If the optional is empty, the output stream is also empty. If the optional contains
323    /// a value, `f` is applied to produce an iterator, and all items from that iterator
324    /// are emitted in the output stream in deterministic order.
325    ///
326    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
327    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
328    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
329    ///
330    /// # Example
331    /// ```rust
332    /// # use hydro_lang::prelude::*;
333    /// # use futures::StreamExt;
334    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
335    /// let tick = process.tick();
336    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
337    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
338    /// # }, |mut stream| async move {
339    /// // 1, 2, 3
340    /// # for w in vec![1, 2, 3] {
341    /// #     assert_eq!(stream.next().await.unwrap(), w);
342    /// # }
343    /// # }));
344    /// ```
345    pub fn flat_map_ordered<U, I, F>(
346        self,
347        f: impl IntoQuotedMut<'a, F, L>,
348    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
349    where
350        I: IntoIterator<Item = U>,
351        F: Fn(T) -> I + 'a,
352    {
353        let f = f.splice_fn1_ctx(&self.location).into();
354        Stream::new(
355            self.location.clone(),
356            HydroNode::FlatMap {
357                f,
358                input: Box::new(self.ir_node.into_inner()),
359                metadata: self.location.new_node_metadata(
360                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
361                ),
362            },
363        )
364    }
365
366    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
367    /// for the output type `I` to produce items in any order.
368    ///
369    /// If the optional is empty, the output stream is also empty. If the optional contains
370    /// a value, `f` is applied to produce an iterator, and all items from that iterator
371    /// are emitted in the output stream in non-deterministic order.
372    ///
373    /// # Example
374    /// ```rust
375    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
376    /// # use futures::StreamExt;
377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
378    /// let tick = process.tick();
379    /// let optional = tick.optional_first_tick(q!(
380    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
381    /// ));
382    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
383    /// # }, |mut stream| async move {
384    /// // 1, 2, 3, but in no particular order
385    /// # let mut results = Vec::new();
386    /// # for _ in 0..3 {
387    /// #     results.push(stream.next().await.unwrap());
388    /// # }
389    /// # results.sort();
390    /// # assert_eq!(results, vec![1, 2, 3]);
391    /// # }));
392    /// ```
393    pub fn flat_map_unordered<U, I, F>(
394        self,
395        f: impl IntoQuotedMut<'a, F, L>,
396    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
397    where
398        I: IntoIterator<Item = U>,
399        F: Fn(T) -> I + 'a,
400    {
401        let f = f.splice_fn1_ctx(&self.location).into();
402        Stream::new(
403            self.location.clone(),
404            HydroNode::FlatMap {
405                f,
406                input: Box::new(self.ir_node.into_inner()),
407                metadata: self
408                    .location
409                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
410            },
411        )
412    }
413
414    /// Flattens the optional value into a stream, preserving the order of elements.
415    ///
416    /// If the optional is empty, the output stream is also empty. If the optional contains
417    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
418    /// in the output stream in deterministic order.
419    ///
420    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
421    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
422    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
423    ///
424    /// # Example
425    /// ```rust
426    /// # use hydro_lang::prelude::*;
427    /// # use futures::StreamExt;
428    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
429    /// let tick = process.tick();
430    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
431    /// optional.flatten_ordered().all_ticks()
432    /// # }, |mut stream| async move {
433    /// // 1, 2, 3
434    /// # for w in vec![1, 2, 3] {
435    /// #     assert_eq!(stream.next().await.unwrap(), w);
436    /// # }
437    /// # }));
438    /// ```
439    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
440    where
441        T: IntoIterator<Item = U>,
442    {
443        self.flat_map_ordered(q!(|v| v))
444    }
445
446    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
447    /// for the element type `T` to produce items in any order.
448    ///
449    /// If the optional is empty, the output stream is also empty. If the optional contains
450    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
451    /// in the output stream in non-deterministic order.
452    ///
453    /// # Example
454    /// ```rust
455    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
456    /// # use futures::StreamExt;
457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
458    /// let tick = process.tick();
459    /// let optional = tick.optional_first_tick(q!(
460    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
461    /// ));
462    /// optional.flatten_unordered().all_ticks()
463    /// # }, |mut stream| async move {
464    /// // 1, 2, 3, but in no particular order
465    /// # let mut results = Vec::new();
466    /// # for _ in 0..3 {
467    /// #     results.push(stream.next().await.unwrap());
468    /// # }
469    /// # results.sort();
470    /// # assert_eq!(results, vec![1, 2, 3]);
471    /// # }));
472    /// ```
473    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
474    where
475        T: IntoIterator<Item = U>,
476    {
477        self.flat_map_unordered(q!(|v| v))
478    }
479
480    /// Creates an optional containing only the value if it satisfies a predicate `f`.
481    ///
482    /// If the optional is empty, the output optional is also empty. If the optional contains
483    /// a value and the predicate returns `true`, the output optional contains the same value.
484    /// If the predicate returns `false`, the output optional is empty.
485    ///
486    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
487    /// not modify or take ownership of the value. If you need to modify the value while filtering
488    /// use [`Optional::filter_map`] instead.
489    ///
490    /// # Example
491    /// ```rust
492    /// # use hydro_lang::prelude::*;
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495    /// let tick = process.tick();
496    /// let optional = tick.optional_first_tick(q!(5));
497    /// optional.filter(q!(|&x| x > 3)).all_ticks()
498    /// # }, |mut stream| async move {
499    /// // 5
500    /// # assert_eq!(stream.next().await.unwrap(), 5);
501    /// # }));
502    /// ```
503    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
504    where
505        F: Fn(&T) -> bool + 'a,
506    {
507        let f = f.splice_fn1_borrow_ctx(&self.location).into();
508        Optional::new(
509            self.location.clone(),
510            HydroNode::Filter {
511                f,
512                input: Box::new(self.ir_node.into_inner()),
513                metadata: self.location.new_node_metadata(Self::collection_kind()),
514            },
515        )
516    }
517
518    /// An operator that both filters and maps. It yields only the value if the supplied
519    /// closure `f` returns `Some(value)`.
520    ///
521    /// If the optional is empty, the output optional is also empty. If the optional contains
522    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
523    /// If the closure returns `None`, the output optional is empty.
524    ///
525    /// # Example
526    /// ```rust
527    /// # use hydro_lang::prelude::*;
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530    /// let tick = process.tick();
531    /// let optional = tick.optional_first_tick(q!("42"));
532    /// optional
533    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
534    ///     .all_ticks()
535    /// # }, |mut stream| async move {
536    /// // 42
537    /// # assert_eq!(stream.next().await.unwrap(), 42);
538    /// # }));
539    /// ```
540    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
541    where
542        F: Fn(T) -> Option<U> + 'a,
543    {
544        let f = f.splice_fn1_ctx(&self.location).into();
545        Optional::new(
546            self.location.clone(),
547            HydroNode::FilterMap {
548                f,
549                input: Box::new(self.ir_node.into_inner()),
550                metadata: self
551                    .location
552                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
553            },
554        )
555    }
556
557    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
558    ///
559    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
560    /// non-null. This is useful for combining several pieces of state together.
561    ///
562    /// # Example
563    /// ```rust
564    /// # use hydro_lang::prelude::*;
565    /// # use futures::StreamExt;
566    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
567    /// let tick = process.tick();
568    /// let numbers = process
569    ///   .source_iter(q!(vec![123, 456, 789]))
570    ///   .batch(&tick, nondet!(/** test */));
571    /// let min = numbers.clone().min(); // Optional
572    /// let max = numbers.max(); // Optional
573    /// min.zip(max).all_ticks()
574    /// # }, |mut stream| async move {
575    /// // [(123, 789)]
576    /// # for w in vec![(123, 789)] {
577    /// #     assert_eq!(stream.next().await.unwrap(), w);
578    /// # }
579    /// # }));
580    /// ```
581    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
582    where
583        O: Clone,
584    {
585        let other: Optional<O, L, B> = other.into();
586        check_matching_location(&self.location, &other.location);
587
588        if L::is_top_level()
589            && let Some(tick) = self.location.try_tick()
590        {
591            let out = zip_inside_tick(
592                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
593                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
594            )
595            .latest();
596
597            Optional::new(out.location, out.ir_node.into_inner())
598        } else {
599            zip_inside_tick(self, other)
600        }
601    }
602
603    /// Passes through `self` when it has a value, otherwise passes through `other`.
604    ///
605    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
606    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
607    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
608    ///
609    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
610    /// of the inputs change (including to/from null states).
611    ///
612    /// # Example
613    /// ```rust
614    /// # use hydro_lang::prelude::*;
615    /// # use futures::StreamExt;
616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617    /// let tick = process.tick();
618    /// // ticks are lazy by default, forces the second tick to run
619    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620    ///
621    /// let some_first_tick = tick.optional_first_tick(q!(123));
622    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
623    /// some_first_tick.or(some_second_tick).all_ticks()
624    /// # }, |mut stream| async move {
625    /// // [123 /* first tick */, 456 /* second tick */]
626    /// # for w in vec![123, 456] {
627    /// #     assert_eq!(stream.next().await.unwrap(), w);
628    /// # }
629    /// # }));
630    /// ```
631    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
632        check_matching_location(&self.location, &other.location);
633
634        if L::is_top_level()
635            && let Some(tick) = self.location.try_tick()
636        {
637            let out = or_inside_tick(
638                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640            )
641            .latest();
642
643            Optional::new(out.location, out.ir_node.into_inner())
644        } else {
645            Optional::new(
646                self.location.clone(),
647                HydroNode::ChainFirst {
648                    first: Box::new(self.ir_node.into_inner()),
649                    second: Box::new(other.ir_node.into_inner()),
650                    metadata: self.location.new_node_metadata(Self::collection_kind()),
651                },
652            )
653        }
654    }
655
656    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
657    ///
658    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
659    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
660    ///
661    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
662    /// of the inputs change (including to/from null states).
663    ///
664    /// # Example
665    /// ```rust
666    /// # use hydro_lang::prelude::*;
667    /// # use futures::StreamExt;
668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669    /// let tick = process.tick();
670    /// // ticks are lazy by default, forces the later ticks to run
671    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672    ///
673    /// let some_first_tick = tick.optional_first_tick(q!(123));
674    /// some_first_tick
675    ///     .unwrap_or(tick.singleton(q!(456)))
676    ///     .all_ticks()
677    /// # }, |mut stream| async move {
678    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
679    /// # for w in vec![123, 456, 456, 456] {
680    /// #     assert_eq!(stream.next().await.unwrap(), w);
681    /// # }
682    /// # }));
683    /// ```
684    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
685        let res_option = self.or(other.into());
686        Singleton::new(
687            res_option.location.clone(),
688            HydroNode::Cast {
689                inner: Box::new(res_option.ir_node.into_inner()),
690                metadata: res_option
691                    .location
692                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
693            },
694        )
695    }
696
697    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
698    ///
699    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
700    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
701    /// so that Hydro can skip any computation on null values.
702    ///
703    /// # Example
704    /// ```rust
705    /// # use hydro_lang::prelude::*;
706    /// # use futures::StreamExt;
707    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
708    /// let tick = process.tick();
709    /// // ticks are lazy by default, forces the later ticks to run
710    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
711    ///
712    /// let some_first_tick = tick.optional_first_tick(q!(123));
713    /// some_first_tick.into_singleton().all_ticks()
714    /// # }, |mut stream| async move {
715    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
716    /// # for w in vec![Some(123), None, None, None] {
717    /// #     assert_eq!(stream.next().await.unwrap(), w);
718    /// # }
719    /// # }));
720    /// ```
721    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
722    where
723        T: Clone,
724    {
725        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
726        let core_ir = HydroNode::Source {
727            source: HydroSource::Iter(none.into()),
728            metadata: self
729                .location
730                .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
731        };
732
733        let none_singleton = if L::is_top_level() {
734            Singleton::new(
735                self.location.clone(),
736                HydroNode::Persist {
737                    inner: Box::new(core_ir),
738                    metadata: self
739                        .location
740                        .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
741                },
742            )
743        } else {
744            Singleton::new(self.location.clone(), core_ir)
745        };
746
747        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
748    }
749
750    /// An operator which allows you to "name" a `HydroNode`.
751    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
752    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
753        {
754            let mut node = self.ir_node.borrow_mut();
755            let metadata = node.metadata_mut();
756            metadata.tag = Some(name.to_string());
757        }
758        self
759    }
760}
761
762impl<'a, T, L> Optional<T, L, Bounded>
763where
764    L: Location<'a>,
765{
766    /// Filters this optional, passing through the optional value if it is non-null **and** the
767    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
768    ///
769    /// Useful for conditionally processing, such as only emitting an optional's value outside
770    /// a tick if some other condition is satisfied.
771    ///
772    /// # Example
773    /// ```rust
774    /// # use hydro_lang::prelude::*;
775    /// # use futures::StreamExt;
776    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
777    /// let tick = process.tick();
778    /// // ticks are lazy by default, forces the second tick to run
779    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
780    ///
781    /// let batch_first_tick = process
782    ///   .source_iter(q!(vec![]))
783    ///   .batch(&tick, nondet!(/** test */));
784    /// let batch_second_tick = process
785    ///   .source_iter(q!(vec![456]))
786    ///   .batch(&tick, nondet!(/** test */))
787    ///   .defer_tick(); // appears on the second tick
788    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
789    /// batch_first_tick.chain(batch_second_tick).first()
790    ///   .filter_if_some(some_on_first_tick)
791    ///   .unwrap_or(tick.singleton(q!(789)))
792    ///   .all_ticks()
793    /// # }, |mut stream| async move {
794    /// // [789, 789]
795    /// # for w in vec![789, 789] {
796    /// #     assert_eq!(stream.next().await.unwrap(), w);
797    /// # }
798    /// # }));
799    /// ```
800    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
801        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
802    }
803
804    /// Filters this optional, passing through the optional value if it is non-null **and** the
805    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
806    ///
807    /// Useful for conditionally processing, such as only emitting an optional's value outside
808    /// a tick if some other condition is satisfied.
809    ///
810    /// # Example
811    /// ```rust
812    /// # use hydro_lang::prelude::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815    /// let tick = process.tick();
816    /// // ticks are lazy by default, forces the second tick to run
817    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818    ///
819    /// let batch_first_tick = process
820    ///   .source_iter(q!(vec![]))
821    ///   .batch(&tick, nondet!(/** test */));
822    /// let batch_second_tick = process
823    ///   .source_iter(q!(vec![456]))
824    ///   .batch(&tick, nondet!(/** test */))
825    ///   .defer_tick(); // appears on the second tick
826    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
827    /// batch_first_tick.chain(batch_second_tick).first()
828    ///   .filter_if_none(some_on_first_tick)
829    ///   .unwrap_or(tick.singleton(q!(789)))
830    ///   .all_ticks()
831    /// # }, |mut stream| async move {
832    /// // [789, 789]
833    /// # for w in vec![789, 456] {
834    /// #     assert_eq!(stream.next().await.unwrap(), w);
835    /// # }
836    /// # }));
837    /// ```
838    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
839        self.filter_if_some(
840            other
841                .map(q!(|_| ()))
842                .into_singleton()
843                .filter(q!(|o| o.is_none())),
844        )
845    }
846
847    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
848    ///
849    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
850    /// having a value, such as only releasing a piece of state if the node is the leader.
851    ///
852    /// # Example
853    /// ```rust
854    /// # use hydro_lang::prelude::*;
855    /// # use futures::StreamExt;
856    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857    /// let tick = process.tick();
858    /// // ticks are lazy by default, forces the second tick to run
859    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
860    ///
861    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
862    /// some_on_first_tick
863    ///     .if_some_then(tick.singleton(q!(456)))
864    ///     .unwrap_or(tick.singleton(q!(123)))
865    /// # .all_ticks()
866    /// # }, |mut stream| async move {
867    /// // 456 (first tick) ~> 123 (second tick onwards)
868    /// # for w in vec![456, 123, 123] {
869    /// #     assert_eq!(stream.next().await.unwrap(), w);
870    /// # }
871    /// # }));
872    /// ```
873    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
874        value.filter_if_some(self)
875    }
876}
877
878impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
879where
880    L: Location<'a> + NoTick,
881{
882    /// Returns an optional value corresponding to the latest snapshot of the optional
883    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
884    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
885    /// all snapshots of this optional into the atomic-associated tick will observe the
886    /// same value each tick.
887    ///
888    /// # Non-Determinism
889    /// Because this picks a snapshot of a optional whose value is continuously changing,
890    /// the output optional has a non-deterministic value since the snapshot can be at an
891    /// arbitrary point in time.
892    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
893        Optional::new(
894            self.location.clone().tick,
895            HydroNode::Batch {
896                inner: Box::new(self.ir_node.into_inner()),
897                metadata: self
898                    .location
899                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
900            },
901        )
902    }
903
904    /// Returns this optional back into a top-level, asynchronous execution context where updates
905    /// to the value will be asynchronously propagated.
906    pub fn end_atomic(self) -> Optional<T, L, B> {
907        Optional::new(
908            self.location.tick.l.clone(),
909            HydroNode::EndAtomic {
910                inner: Box::new(self.ir_node.into_inner()),
911                metadata: self
912                    .location
913                    .tick
914                    .l
915                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
916            },
917        )
918    }
919}
920
921impl<'a, T, L, B: Boundedness> Optional<T, L, B>
922where
923    L: Location<'a>,
924{
925    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
926    /// will observe the same version of the value and will be executed synchronously before any
927    /// outputs are yielded (in [`Optional::end_atomic`]).
928    ///
929    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
930    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
931    /// a different version).
932    ///
933    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
934    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
935    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
936    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
937        let out_location = Atomic { tick: tick.clone() };
938        Optional::new(
939            out_location.clone(),
940            HydroNode::BeginAtomic {
941                inner: Box::new(self.ir_node.into_inner()),
942                metadata: out_location
943                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
944            },
945        )
946    }
947
948    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
949    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
950    /// relevant data that contributed to the snapshot at tick `t`.
951    ///
952    /// # Non-Determinism
953    /// Because this picks a snapshot of a optional whose value is continuously changing,
954    /// the output optional has a non-deterministic value since the snapshot can be at an
955    /// arbitrary point in time.
956    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
957        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
958        Optional::new(
959            tick.clone(),
960            HydroNode::Batch {
961                inner: Box::new(self.ir_node.into_inner()),
962                metadata: tick
963                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
964            },
965        )
966    }
967
968    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
969    /// with order corresponding to increasing prefixes of data contributing to the optional.
970    ///
971    /// # Non-Determinism
972    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
973    /// to non-deterministic batching and arrival of inputs, the output stream is
974    /// non-deterministic.
975    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
976    where
977        L: NoTick,
978    {
979        let tick = self.location.tick();
980        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
981    }
982
983    /// Given a time interval, returns a stream corresponding to snapshots of the optional
984    /// value taken at various points in time. Because the input optional may be
985    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
986    /// represent the value of the optional given some prefix of the streams leading up to
987    /// it.
988    ///
989    /// # Non-Determinism
990    /// The output stream is non-deterministic in which elements are sampled, since this
991    /// is controlled by a clock.
992    pub fn sample_every(
993        self,
994        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
995        nondet: NonDet,
996    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
997    where
998        L: NoTick + NoAtomic,
999    {
1000        let samples = self.location.source_interval(interval, nondet);
1001        let tick = self.location.tick();
1002
1003        self.snapshot(&tick, nondet)
1004            .filter_if_some(samples.batch(&tick, nondet).first())
1005            .all_ticks()
1006            .weakest_retries()
1007    }
1008}
1009
1010impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1011where
1012    L: Location<'a>,
1013{
1014    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1015    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1016    /// null values).
1017    ///
1018    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1019    /// producing one element in the output for each (non-null) tick. This is useful for batched
1020    /// computations, where the results from each tick must be combined together.
1021    ///
1022    /// # Example
1023    /// ```rust
1024    /// # use hydro_lang::prelude::*;
1025    /// # use futures::StreamExt;
1026    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1027    /// # let tick = process.tick();
1028    /// # // ticks are lazy by default, forces the second tick to run
1029    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1030    /// # let batch_first_tick = process
1031    /// #   .source_iter(q!(vec![]))
1032    /// #   .batch(&tick, nondet!(/** test */));
1033    /// # let batch_second_tick = process
1034    /// #   .source_iter(q!(vec![1, 2, 3]))
1035    /// #   .batch(&tick, nondet!(/** test */))
1036    /// #   .defer_tick(); // appears on the second tick
1037    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1038    /// input_batch // first tick: [], second tick: [1, 2, 3]
1039    ///     .max()
1040    ///     .all_ticks()
1041    /// # }, |mut stream| async move {
1042    /// // [3]
1043    /// # for w in vec![3] {
1044    /// #     assert_eq!(stream.next().await.unwrap(), w);
1045    /// # }
1046    /// # }));
1047    /// ```
1048    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1049        self.into_stream().all_ticks()
1050    }
1051
1052    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1053    /// which will stream the value computed in _each_ tick as a separate stream element.
1054    ///
1055    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1056    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1057    /// optional's [`Tick`] context.
1058    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1059        self.into_stream().all_ticks_atomic()
1060    }
1061
1062    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1063    /// be asynchronously updated with the latest value of the optional inside the tick, including
1064    /// whether the optional is null or not.
1065    ///
1066    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1067    /// tick that tracks the inner value. This is useful for getting the value as of the
1068    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1069    ///
1070    /// # Example
1071    /// ```rust
1072    /// # use hydro_lang::prelude::*;
1073    /// # use futures::StreamExt;
1074    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1075    /// # let tick = process.tick();
1076    /// # // ticks are lazy by default, forces the second tick to run
1077    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1078    /// # let batch_first_tick = process
1079    /// #   .source_iter(q!(vec![]))
1080    /// #   .batch(&tick, nondet!(/** test */));
1081    /// # let batch_second_tick = process
1082    /// #   .source_iter(q!(vec![1, 2, 3]))
1083    /// #   .batch(&tick, nondet!(/** test */))
1084    /// #   .defer_tick(); // appears on the second tick
1085    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1086    /// input_batch // first tick: [], second tick: [1, 2, 3]
1087    ///     .max()
1088    ///     .latest()
1089    /// # .into_singleton()
1090    /// # .sample_eager(nondet!(/** test */))
1091    /// # }, |mut stream| async move {
1092    /// // asynchronously changes from None ~> 3
1093    /// # for w in vec![None, Some(3)] {
1094    /// #     assert_eq!(stream.next().await.unwrap(), w);
1095    /// # }
1096    /// # }));
1097    /// ```
1098    pub fn latest(self) -> Optional<T, L, Unbounded> {
1099        Optional::new(
1100            self.location.outer().clone(),
1101            HydroNode::YieldConcat {
1102                inner: Box::new(self.ir_node.into_inner()),
1103                metadata: self
1104                    .location
1105                    .outer()
1106                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1107            },
1108        )
1109    }
1110
1111    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1112    /// be updated with the latest value of the optional inside the tick.
1113    ///
1114    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1115    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1116    /// optional's [`Tick`] context.
1117    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1118        let out_location = Atomic {
1119            tick: self.location.clone(),
1120        };
1121
1122        Optional::new(
1123            out_location.clone(),
1124            HydroNode::YieldConcat {
1125                inner: Box::new(self.ir_node.into_inner()),
1126                metadata: out_location
1127                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1128            },
1129        )
1130    }
1131
1132    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1133    /// always has the state of `self` at tick `T - 1`.
1134    ///
1135    /// At tick `0`, the output optional is null, since there is no previous tick.
1136    ///
1137    /// This operator enables stateful iterative processing with ticks, by sending data from one
1138    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1139    ///
1140    /// # Example
1141    /// ```rust
1142    /// # use hydro_lang::prelude::*;
1143    /// # use futures::StreamExt;
1144    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1145    /// let tick = process.tick();
1146    /// // ticks are lazy by default, forces the second tick to run
1147    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1148    ///
1149    /// let batch_first_tick = process
1150    ///   .source_iter(q!(vec![1, 2]))
1151    ///   .batch(&tick, nondet!(/** test */));
1152    /// let batch_second_tick = process
1153    ///   .source_iter(q!(vec![3, 4]))
1154    ///   .batch(&tick, nondet!(/** test */))
1155    ///   .defer_tick(); // appears on the second tick
1156    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1157    ///   .reduce(q!(|state, v| *state += v));
1158    ///
1159    /// current_tick_sum.clone().into_singleton().zip(
1160    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1161    /// ).all_ticks()
1162    /// # }, |mut stream| async move {
1163    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1164    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1165    /// #     assert_eq!(stream.next().await.unwrap(), w);
1166    /// # }
1167    /// # }));
1168    /// ```
1169    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1170        Optional::new(
1171            self.location.clone(),
1172            HydroNode::DeferTick {
1173                input: Box::new(self.ir_node.into_inner()),
1174                metadata: self.location.new_node_metadata(Self::collection_kind()),
1175            },
1176        )
1177    }
1178
1179    #[deprecated(note = "use .into_stream().persist()")]
1180    #[expect(missing_docs, reason = "deprecated")]
1181    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1182    where
1183        T: Clone,
1184    {
1185        self.into_stream().persist()
1186    }
1187
1188    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1189    /// non-null. Otherwise, the stream is empty.
1190    ///
1191    /// # Example
1192    /// ```rust
1193    /// # use hydro_lang::prelude::*;
1194    /// # use futures::StreamExt;
1195    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1196    /// # let tick = process.tick();
1197    /// # // ticks are lazy by default, forces the second tick to run
1198    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1199    /// # let batch_first_tick = process
1200    /// #   .source_iter(q!(vec![]))
1201    /// #   .batch(&tick, nondet!(/** test */));
1202    /// # let batch_second_tick = process
1203    /// #   .source_iter(q!(vec![123, 456]))
1204    /// #   .batch(&tick, nondet!(/** test */))
1205    /// #   .defer_tick(); // appears on the second tick
1206    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1207    /// input_batch // first tick: [], second tick: [123, 456]
1208    ///     .clone()
1209    ///     .max()
1210    ///     .into_stream()
1211    ///     .chain(input_batch)
1212    ///     .all_ticks()
1213    /// # }, |mut stream| async move {
1214    /// // [456, 123, 456]
1215    /// # for w in vec![456, 123, 456] {
1216    /// #     assert_eq!(stream.next().await.unwrap(), w);
1217    /// # }
1218    /// # }));
1219    /// ```
1220    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1221        Stream::new(
1222            self.location.clone(),
1223            HydroNode::Cast {
1224                inner: Box::new(self.ir_node.into_inner()),
1225                metadata: self.location.new_node_metadata(Stream::<
1226                    T,
1227                    Tick<L>,
1228                    Bounded,
1229                    TotalOrder,
1230                    ExactlyOnce,
1231                >::collection_kind()),
1232            },
1233        )
1234    }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use futures::StreamExt;
1240    use hydro_deploy::Deployment;
1241    use stageleft::q;
1242
1243    use super::Optional;
1244    use crate::compile::builder::FlowBuilder;
1245    use crate::location::Location;
1246
1247    #[tokio::test]
1248    async fn optional_or_cardinality() {
1249        let mut deployment = Deployment::new();
1250
1251        let flow = FlowBuilder::new();
1252        let node = flow.process::<()>();
1253        let external = flow.external::<()>();
1254
1255        let node_tick = node.tick();
1256        let tick_singleton = node_tick.singleton(q!(123));
1257        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1258        let counts = tick_optional_inhabited
1259            .clone()
1260            .or(tick_optional_inhabited)
1261            .into_stream()
1262            .count()
1263            .all_ticks()
1264            .send_bincode_external(&external);
1265
1266        let nodes = flow
1267            .with_process(&node, deployment.Localhost())
1268            .with_external(&external, deployment.Localhost())
1269            .deploy(&mut deployment);
1270
1271        deployment.deploy().await.unwrap();
1272
1273        let mut external_out = nodes.connect(counts).await;
1274
1275        deployment.start().await.unwrap();
1276
1277        assert_eq!(external_out.next().await.unwrap(), 1);
1278    }
1279}