hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46    L: Location<'a>,
47{
48    fn defer_tick(self) -> Self {
49        Optional::defer_tick(self)
50    }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60        Optional::new(
61            location.clone(),
62            HydroNode::CycleSource {
63                ident,
64                metadata: location.new_node_metadata(Self::collection_kind()),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            Location::id(&self.location),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .push_root(HydroRoot::CycleSink {
84                ident,
85                input: Box::new(self.ir_node.into_inner()),
86                op_metadata: HydroIrOpMetadata::new(),
87            });
88    }
89}
90
91impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
92where
93    L: Location<'a>,
94{
95    type Location = Tick<L>;
96
97    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
98        Optional::new(
99            location.clone(),
100            HydroNode::CycleSource {
101                ident,
102                metadata: location.new_node_metadata(Self::collection_kind()),
103            },
104        )
105    }
106}
107
108impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
109where
110    L: Location<'a>,
111{
112    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
113        assert_eq!(
114            Location::id(&self.location),
115            expected_location,
116            "locations do not match"
117        );
118        self.location
119            .flow_state()
120            .borrow_mut()
121            .push_root(HydroRoot::CycleSink {
122                ident,
123                input: Box::new(self.ir_node.into_inner()),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126    }
127}
128
129impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
130where
131    L: Location<'a> + NoTick,
132{
133    type Location = L;
134
135    fn create_source(ident: syn::Ident, location: L) -> Self {
136        Optional::new(
137            location.clone(),
138            HydroNode::CycleSource {
139                ident,
140                metadata: location.new_node_metadata(Self::collection_kind()),
141            },
142        )
143    }
144}
145
146impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
147where
148    L: Location<'a> + NoTick,
149{
150    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
151        assert_eq!(
152            Location::id(&self.location),
153            expected_location,
154            "locations do not match"
155        );
156        self.location
157            .flow_state()
158            .borrow_mut()
159            .push_root(HydroRoot::CycleSink {
160                ident,
161                input: Box::new(self.ir_node.into_inner()),
162                op_metadata: HydroIrOpMetadata::new(),
163            });
164    }
165}
166
167impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
168where
169    L: Location<'a>,
170{
171    fn from(singleton: Optional<T, L, Bounded>) -> Self {
172        Optional::new(singleton.location, singleton.ir_node.into_inner())
173    }
174}
175
176impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
177where
178    L: Location<'a>,
179{
180    fn from(singleton: Singleton<T, L, B>) -> Self {
181        Optional::new(
182            singleton.location.clone(),
183            HydroNode::Cast {
184                inner: Box::new(singleton.ir_node.into_inner()),
185                metadata: singleton
186                    .location
187                    .new_node_metadata(Self::collection_kind()),
188            },
189        )
190    }
191}
192
193#[cfg(stageleft_runtime)]
194fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
195    me: Optional<T, L, B>,
196    other: Optional<O, L, B>,
197) -> Optional<(T, O), L, B> {
198    check_matching_location(&me.location, &other.location);
199
200    Optional::new(
201        me.location.clone(),
202        HydroNode::CrossSingleton {
203            left: Box::new(me.ir_node.into_inner()),
204            right: Box::new(other.ir_node.into_inner()),
205            metadata: me
206                .location
207                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
208        },
209    )
210}
211
212#[cfg(stageleft_runtime)]
213fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
214    me: Optional<T, L, B>,
215    other: Optional<T, L, B>,
216) -> Optional<T, L, B> {
217    check_matching_location(&me.location, &other.location);
218
219    Optional::new(
220        me.location.clone(),
221        HydroNode::ChainFirst {
222            first: Box::new(me.ir_node.into_inner()),
223            second: Box::new(other.ir_node.into_inner()),
224            metadata: me
225                .location
226                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
227        },
228    )
229}
230
231impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
232where
233    T: Clone,
234    L: Location<'a>,
235{
236    fn clone(&self) -> Self {
237        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
238            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
239            *self.ir_node.borrow_mut() = HydroNode::Tee {
240                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
241                metadata: self.location.new_node_metadata(Self::collection_kind()),
242            };
243        }
244
245        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
246            Optional {
247                location: self.location.clone(),
248                ir_node: HydroNode::Tee {
249                    inner: TeeNode(inner.0.clone()),
250                    metadata: metadata.clone(),
251                }
252                .into(),
253                _phantom: PhantomData,
254            }
255        } else {
256            unreachable!()
257        }
258    }
259}
260
261impl<'a, T, L, B: Boundedness> Optional<T, L, B>
262where
263    L: Location<'a>,
264{
265    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
266        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
267        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
268        Optional {
269            location,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    pub(crate) fn collection_kind() -> CollectionKind {
276        CollectionKind::Optional {
277            bound: B::BOUND_KIND,
278            element_type: stageleft::quote_type::<T>().into(),
279        }
280    }
281
282    /// Returns the [`Location`] where this optional is being materialized.
283    pub fn location(&self) -> &L {
284        &self.location
285    }
286
287    /// Transforms the optional value by applying a function `f` to it,
288    /// continuously as the input is updated.
289    ///
290    /// Whenever the optional is empty, the output optional is also empty.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298    /// let tick = process.tick();
299    /// let optional = tick.optional_first_tick(q!(1));
300    /// optional.map(q!(|v| v + 1)).all_ticks()
301    /// # }, |mut stream| async move {
302    /// // 2
303    /// # assert_eq!(stream.next().await.unwrap(), 2);
304    /// # }));
305    /// # }
306    /// ```
307    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308    where
309        F: Fn(T) -> U + 'a,
310    {
311        let f = f.splice_fn1_ctx(&self.location).into();
312        Optional::new(
313            self.location.clone(),
314            HydroNode::Map {
315                f,
316                input: Box::new(self.ir_node.into_inner()),
317                metadata: self
318                    .location
319                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320            },
321        )
322    }
323
324    /// Transforms the optional value by applying a function `f` to it and then flattening
325    /// the result into a stream, preserving the order of elements.
326    ///
327    /// If the optional is empty, the output stream is also empty. If the optional contains
328    /// a value, `f` is applied to produce an iterator, and all items from that iterator
329    /// are emitted in the output stream in deterministic order.
330    ///
331    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// let tick = process.tick();
342    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
343    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
344    /// # }, |mut stream| async move {
345    /// // 1, 2, 3
346    /// # for w in vec![1, 2, 3] {
347    /// #     assert_eq!(stream.next().await.unwrap(), w);
348    /// # }
349    /// # }));
350    /// # }
351    /// ```
352    pub fn flat_map_ordered<U, I, F>(
353        self,
354        f: impl IntoQuotedMut<'a, F, L>,
355    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
356    where
357        I: IntoIterator<Item = U>,
358        F: Fn(T) -> I + 'a,
359    {
360        let f = f.splice_fn1_ctx(&self.location).into();
361        Stream::new(
362            self.location.clone(),
363            HydroNode::FlatMap {
364                f,
365                input: Box::new(self.ir_node.into_inner()),
366                metadata: self.location.new_node_metadata(
367                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
368                ),
369            },
370        )
371    }
372
373    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
374    /// for the output type `I` to produce items in any order.
375    ///
376    /// If the optional is empty, the output stream is also empty. If the optional contains
377    /// a value, `f` is applied to produce an iterator, and all items from that iterator
378    /// are emitted in the output stream in non-deterministic order.
379    ///
380    /// # Example
381    /// ```rust
382    /// # #[cfg(feature = "deploy")] {
383    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
386    /// let tick = process.tick();
387    /// let optional = tick.optional_first_tick(q!(
388    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
389    /// ));
390    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
391    /// # }, |mut stream| async move {
392    /// // 1, 2, 3, but in no particular order
393    /// # let mut results = Vec::new();
394    /// # for _ in 0..3 {
395    /// #     results.push(stream.next().await.unwrap());
396    /// # }
397    /// # results.sort();
398    /// # assert_eq!(results, vec![1, 2, 3]);
399    /// # }));
400    /// # }
401    /// ```
402    pub fn flat_map_unordered<U, I, F>(
403        self,
404        f: impl IntoQuotedMut<'a, F, L>,
405    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
406    where
407        I: IntoIterator<Item = U>,
408        F: Fn(T) -> I + 'a,
409    {
410        let f = f.splice_fn1_ctx(&self.location).into();
411        Stream::new(
412            self.location.clone(),
413            HydroNode::FlatMap {
414                f,
415                input: Box::new(self.ir_node.into_inner()),
416                metadata: self
417                    .location
418                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
419            },
420        )
421    }
422
423    /// Flattens the optional value into a stream, preserving the order of elements.
424    ///
425    /// If the optional is empty, the output stream is also empty. If the optional contains
426    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
427    /// in the output stream in deterministic order.
428    ///
429    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
430    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
431    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
432    ///
433    /// # Example
434    /// ```rust
435    /// # #[cfg(feature = "deploy")] {
436    /// # use hydro_lang::prelude::*;
437    /// # use futures::StreamExt;
438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439    /// let tick = process.tick();
440    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
441    /// optional.flatten_ordered().all_ticks()
442    /// # }, |mut stream| async move {
443    /// // 1, 2, 3
444    /// # for w in vec![1, 2, 3] {
445    /// #     assert_eq!(stream.next().await.unwrap(), w);
446    /// # }
447    /// # }));
448    /// # }
449    /// ```
450    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
451    where
452        T: IntoIterator<Item = U>,
453    {
454        self.flat_map_ordered(q!(|v| v))
455    }
456
457    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
458    /// for the element type `T` to produce items in any order.
459    ///
460    /// If the optional is empty, the output stream is also empty. If the optional contains
461    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
462    /// in the output stream in non-deterministic order.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
470    /// let tick = process.tick();
471    /// let optional = tick.optional_first_tick(q!(
472    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473    /// ));
474    /// optional.flatten_unordered().all_ticks()
475    /// # }, |mut stream| async move {
476    /// // 1, 2, 3, but in no particular order
477    /// # let mut results = Vec::new();
478    /// # for _ in 0..3 {
479    /// #     results.push(stream.next().await.unwrap());
480    /// # }
481    /// # results.sort();
482    /// # assert_eq!(results, vec![1, 2, 3]);
483    /// # }));
484    /// # }
485    /// ```
486    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
487    where
488        T: IntoIterator<Item = U>,
489    {
490        self.flat_map_unordered(q!(|v| v))
491    }
492
493    /// Creates an optional containing only the value if it satisfies a predicate `f`.
494    ///
495    /// If the optional is empty, the output optional is also empty. If the optional contains
496    /// a value and the predicate returns `true`, the output optional contains the same value.
497    /// If the predicate returns `false`, the output optional is empty.
498    ///
499    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
500    /// not modify or take ownership of the value. If you need to modify the value while filtering
501    /// use [`Optional::filter_map`] instead.
502    ///
503    /// # Example
504    /// ```rust
505    /// # #[cfg(feature = "deploy")] {
506    /// # use hydro_lang::prelude::*;
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509    /// let tick = process.tick();
510    /// let optional = tick.optional_first_tick(q!(5));
511    /// optional.filter(q!(|&x| x > 3)).all_ticks()
512    /// # }, |mut stream| async move {
513    /// // 5
514    /// # assert_eq!(stream.next().await.unwrap(), 5);
515    /// # }));
516    /// # }
517    /// ```
518    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
519    where
520        F: Fn(&T) -> bool + 'a,
521    {
522        let f = f.splice_fn1_borrow_ctx(&self.location).into();
523        Optional::new(
524            self.location.clone(),
525            HydroNode::Filter {
526                f,
527                input: Box::new(self.ir_node.into_inner()),
528                metadata: self.location.new_node_metadata(Self::collection_kind()),
529            },
530        )
531    }
532
533    /// An operator that both filters and maps. It yields only the value if the supplied
534    /// closure `f` returns `Some(value)`.
535    ///
536    /// If the optional is empty, the output optional is also empty. If the optional contains
537    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
538    /// If the closure returns `None`, the output optional is empty.
539    ///
540    /// # Example
541    /// ```rust
542    /// # #[cfg(feature = "deploy")] {
543    /// # use hydro_lang::prelude::*;
544    /// # use futures::StreamExt;
545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
546    /// let tick = process.tick();
547    /// let optional = tick.optional_first_tick(q!("42"));
548    /// optional
549    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
550    ///     .all_ticks()
551    /// # }, |mut stream| async move {
552    /// // 42
553    /// # assert_eq!(stream.next().await.unwrap(), 42);
554    /// # }));
555    /// # }
556    /// ```
557    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
558    where
559        F: Fn(T) -> Option<U> + 'a,
560    {
561        let f = f.splice_fn1_ctx(&self.location).into();
562        Optional::new(
563            self.location.clone(),
564            HydroNode::FilterMap {
565                f,
566                input: Box::new(self.ir_node.into_inner()),
567                metadata: self
568                    .location
569                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
570            },
571        )
572    }
573
574    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
575    ///
576    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
577    /// non-null. This is useful for combining several pieces of state together.
578    ///
579    /// # Example
580    /// ```rust
581    /// # #[cfg(feature = "deploy")] {
582    /// # use hydro_lang::prelude::*;
583    /// # use futures::StreamExt;
584    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585    /// let tick = process.tick();
586    /// let numbers = process
587    ///   .source_iter(q!(vec![123, 456, 789]))
588    ///   .batch(&tick, nondet!(/** test */));
589    /// let min = numbers.clone().min(); // Optional
590    /// let max = numbers.max(); // Optional
591    /// min.zip(max).all_ticks()
592    /// # }, |mut stream| async move {
593    /// // [(123, 789)]
594    /// # for w in vec![(123, 789)] {
595    /// #     assert_eq!(stream.next().await.unwrap(), w);
596    /// # }
597    /// # }));
598    /// # }
599    /// ```
600    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
601    where
602        O: Clone,
603    {
604        let other: Optional<O, L, B> = other.into();
605        check_matching_location(&self.location, &other.location);
606
607        if L::is_top_level()
608            && let Some(tick) = self.location.try_tick()
609        {
610            let out = zip_inside_tick(
611                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613            )
614            .latest();
615
616            Optional::new(out.location, out.ir_node.into_inner())
617        } else {
618            zip_inside_tick(self, other)
619        }
620    }
621
622    /// Passes through `self` when it has a value, otherwise passes through `other`.
623    ///
624    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
625    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
626    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
627    ///
628    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
629    /// of the inputs change (including to/from null states).
630    ///
631    /// # Example
632    /// ```rust
633    /// # #[cfg(feature = "deploy")] {
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// let tick = process.tick();
638    /// // ticks are lazy by default, forces the second tick to run
639    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640    ///
641    /// let some_first_tick = tick.optional_first_tick(q!(123));
642    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
643    /// some_first_tick.or(some_second_tick).all_ticks()
644    /// # }, |mut stream| async move {
645    /// // [123 /* first tick */, 456 /* second tick */]
646    /// # for w in vec![123, 456] {
647    /// #     assert_eq!(stream.next().await.unwrap(), w);
648    /// # }
649    /// # }));
650    /// # }
651    /// ```
652    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
653        check_matching_location(&self.location, &other.location);
654
655        if L::is_top_level()
656            && let Some(tick) = self.location.try_tick()
657        {
658            let out = or_inside_tick(
659                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
660                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
661            )
662            .latest();
663
664            Optional::new(out.location, out.ir_node.into_inner())
665        } else {
666            Optional::new(
667                self.location.clone(),
668                HydroNode::ChainFirst {
669                    first: Box::new(self.ir_node.into_inner()),
670                    second: Box::new(other.ir_node.into_inner()),
671                    metadata: self.location.new_node_metadata(Self::collection_kind()),
672                },
673            )
674        }
675    }
676
677    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
678    ///
679    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
680    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
681    ///
682    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683    /// of the inputs change (including to/from null states).
684    ///
685    /// # Example
686    /// ```rust
687    /// # #[cfg(feature = "deploy")] {
688    /// # use hydro_lang::prelude::*;
689    /// # use futures::StreamExt;
690    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691    /// let tick = process.tick();
692    /// // ticks are lazy by default, forces the later ticks to run
693    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694    ///
695    /// let some_first_tick = tick.optional_first_tick(q!(123));
696    /// some_first_tick
697    ///     .unwrap_or(tick.singleton(q!(456)))
698    ///     .all_ticks()
699    /// # }, |mut stream| async move {
700    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
701    /// # for w in vec![123, 456, 456, 456] {
702    /// #     assert_eq!(stream.next().await.unwrap(), w);
703    /// # }
704    /// # }));
705    /// # }
706    /// ```
707    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
708        let res_option = self.or(other.into());
709        Singleton::new(
710            res_option.location.clone(),
711            HydroNode::Cast {
712                inner: Box::new(res_option.ir_node.into_inner()),
713                metadata: res_option
714                    .location
715                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
716            },
717        )
718    }
719
720    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
721    ///
722    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
723    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
724    /// so that Hydro can skip any computation on null values.
725    ///
726    /// # Example
727    /// ```rust
728    /// # #[cfg(feature = "deploy")] {
729    /// # use hydro_lang::prelude::*;
730    /// # use futures::StreamExt;
731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
732    /// let tick = process.tick();
733    /// // ticks are lazy by default, forces the later ticks to run
734    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
735    ///
736    /// let some_first_tick = tick.optional_first_tick(q!(123));
737    /// some_first_tick.into_singleton().all_ticks()
738    /// # }, |mut stream| async move {
739    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
740    /// # for w in vec![Some(123), None, None, None] {
741    /// #     assert_eq!(stream.next().await.unwrap(), w);
742    /// # }
743    /// # }));
744    /// # }
745    /// ```
746    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
747    where
748        T: Clone,
749    {
750        let none: syn::Expr = parse_quote!(::std::option::Option::None);
751
752        let none_singleton = Singleton::new(
753            self.location.clone(),
754            HydroNode::SingletonSource {
755                value: none.into(),
756                metadata: self
757                    .location
758                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
759            },
760        );
761
762        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
763    }
764
765    /// An operator which allows you to "name" a `HydroNode`.
766    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
767    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
768        {
769            let mut node = self.ir_node.borrow_mut();
770            let metadata = node.metadata_mut();
771            metadata.tag = Some(name.to_string());
772        }
773        self
774    }
775}
776
777impl<'a, T, L> Optional<T, L, Bounded>
778where
779    L: Location<'a>,
780{
781    /// Filters this optional, passing through the optional value if it is non-null **and** the
782    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
783    ///
784    /// Useful for conditionally processing, such as only emitting an optional's value outside
785    /// a tick if some other condition is satisfied.
786    ///
787    /// # Example
788    /// ```rust
789    /// # #[cfg(feature = "deploy")] {
790    /// # use hydro_lang::prelude::*;
791    /// # use futures::StreamExt;
792    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793    /// let tick = process.tick();
794    /// // ticks are lazy by default, forces the second tick to run
795    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
796    ///
797    /// let batch_first_tick = process
798    ///   .source_iter(q!(vec![]))
799    ///   .batch(&tick, nondet!(/** test */));
800    /// let batch_second_tick = process
801    ///   .source_iter(q!(vec![456]))
802    ///   .batch(&tick, nondet!(/** test */))
803    ///   .defer_tick(); // appears on the second tick
804    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
805    /// batch_first_tick.chain(batch_second_tick).first()
806    ///   .filter_if_some(some_on_first_tick)
807    ///   .unwrap_or(tick.singleton(q!(789)))
808    ///   .all_ticks()
809    /// # }, |mut stream| async move {
810    /// // [789, 789]
811    /// # for w in vec![789, 789] {
812    /// #     assert_eq!(stream.next().await.unwrap(), w);
813    /// # }
814    /// # }));
815    /// # }
816    /// ```
817    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
818        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
819    }
820
821    /// Filters this optional, passing through the optional value if it is non-null **and** the
822    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
823    ///
824    /// Useful for conditionally processing, such as only emitting an optional's value outside
825    /// a tick if some other condition is satisfied.
826    ///
827    /// # Example
828    /// ```rust
829    /// # #[cfg(feature = "deploy")] {
830    /// # use hydro_lang::prelude::*;
831    /// # use futures::StreamExt;
832    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
833    /// let tick = process.tick();
834    /// // ticks are lazy by default, forces the second tick to run
835    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
836    ///
837    /// let batch_first_tick = process
838    ///   .source_iter(q!(vec![]))
839    ///   .batch(&tick, nondet!(/** test */));
840    /// let batch_second_tick = process
841    ///   .source_iter(q!(vec![456]))
842    ///   .batch(&tick, nondet!(/** test */))
843    ///   .defer_tick(); // appears on the second tick
844    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
845    /// batch_first_tick.chain(batch_second_tick).first()
846    ///   .filter_if_none(some_on_first_tick)
847    ///   .unwrap_or(tick.singleton(q!(789)))
848    ///   .all_ticks()
849    /// # }, |mut stream| async move {
850    /// // [789, 789]
851    /// # for w in vec![789, 456] {
852    /// #     assert_eq!(stream.next().await.unwrap(), w);
853    /// # }
854    /// # }));
855    /// # }
856    /// ```
857    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
858        self.filter_if_some(
859            other
860                .map(q!(|_| ()))
861                .into_singleton()
862                .filter(q!(|o| o.is_none())),
863        )
864    }
865
866    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
867    ///
868    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
869    /// having a value, such as only releasing a piece of state if the node is the leader.
870    ///
871    /// # Example
872    /// ```rust
873    /// # #[cfg(feature = "deploy")] {
874    /// # use hydro_lang::prelude::*;
875    /// # use futures::StreamExt;
876    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
877    /// let tick = process.tick();
878    /// // ticks are lazy by default, forces the second tick to run
879    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
880    ///
881    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
882    /// some_on_first_tick
883    ///     .if_some_then(tick.singleton(q!(456)))
884    ///     .unwrap_or(tick.singleton(q!(123)))
885    /// # .all_ticks()
886    /// # }, |mut stream| async move {
887    /// // 456 (first tick) ~> 123 (second tick onwards)
888    /// # for w in vec![456, 123, 123] {
889    /// #     assert_eq!(stream.next().await.unwrap(), w);
890    /// # }
891    /// # }));
892    /// # }
893    /// ```
894    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
895        value.filter_if_some(self)
896    }
897}
898
899impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
900where
901    L: Location<'a> + NoTick,
902{
903    /// Returns an optional value corresponding to the latest snapshot of the optional
904    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
905    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
906    /// all snapshots of this optional into the atomic-associated tick will observe the
907    /// same value each tick.
908    ///
909    /// # Non-Determinism
910    /// Because this picks a snapshot of a optional whose value is continuously changing,
911    /// the output optional has a non-deterministic value since the snapshot can be at an
912    /// arbitrary point in time.
913    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
914        Optional::new(
915            self.location.clone().tick,
916            HydroNode::Batch {
917                inner: Box::new(self.ir_node.into_inner()),
918                metadata: self
919                    .location
920                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
921            },
922        )
923    }
924
925    /// Returns this optional back into a top-level, asynchronous execution context where updates
926    /// to the value will be asynchronously propagated.
927    pub fn end_atomic(self) -> Optional<T, L, B> {
928        Optional::new(
929            self.location.tick.l.clone(),
930            HydroNode::EndAtomic {
931                inner: Box::new(self.ir_node.into_inner()),
932                metadata: self
933                    .location
934                    .tick
935                    .l
936                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
937            },
938        )
939    }
940}
941
942impl<'a, T, L, B: Boundedness> Optional<T, L, B>
943where
944    L: Location<'a>,
945{
946    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
947    /// will observe the same version of the value and will be executed synchronously before any
948    /// outputs are yielded (in [`Optional::end_atomic`]).
949    ///
950    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
951    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
952    /// a different version).
953    ///
954    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
955    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
956    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
957    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
958        let out_location = Atomic { tick: tick.clone() };
959        Optional::new(
960            out_location.clone(),
961            HydroNode::BeginAtomic {
962                inner: Box::new(self.ir_node.into_inner()),
963                metadata: out_location
964                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
965            },
966        )
967    }
968
969    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
970    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
971    /// relevant data that contributed to the snapshot at tick `t`.
972    ///
973    /// # Non-Determinism
974    /// Because this picks a snapshot of a optional whose value is continuously changing,
975    /// the output optional has a non-deterministic value since the snapshot can be at an
976    /// arbitrary point in time.
977    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
978        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
979        Optional::new(
980            tick.clone(),
981            HydroNode::Batch {
982                inner: Box::new(self.ir_node.into_inner()),
983                metadata: tick
984                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
985            },
986        )
987    }
988
989    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
990    /// with order corresponding to increasing prefixes of data contributing to the optional.
991    ///
992    /// # Non-Determinism
993    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
994    /// to non-deterministic batching and arrival of inputs, the output stream is
995    /// non-deterministic.
996    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
997    where
998        L: NoTick,
999    {
1000        let tick = self.location.tick();
1001        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1002    }
1003
1004    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1005    /// value taken at various points in time. Because the input optional may be
1006    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1007    /// represent the value of the optional given some prefix of the streams leading up to
1008    /// it.
1009    ///
1010    /// # Non-Determinism
1011    /// The output stream is non-deterministic in which elements are sampled, since this
1012    /// is controlled by a clock.
1013    pub fn sample_every(
1014        self,
1015        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1016        nondet: NonDet,
1017    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1018    where
1019        L: NoTick + NoAtomic,
1020    {
1021        let samples = self.location.source_interval(interval, nondet);
1022        let tick = self.location.tick();
1023
1024        self.snapshot(&tick, nondet)
1025            .filter_if_some(samples.batch(&tick, nondet).first())
1026            .all_ticks()
1027            .weakest_retries()
1028    }
1029}
1030
1031impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1032where
1033    L: Location<'a>,
1034{
1035    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1036    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1037    /// null values).
1038    ///
1039    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1040    /// producing one element in the output for each (non-null) tick. This is useful for batched
1041    /// computations, where the results from each tick must be combined together.
1042    ///
1043    /// # Example
1044    /// ```rust
1045    /// # #[cfg(feature = "deploy")] {
1046    /// # use hydro_lang::prelude::*;
1047    /// # use futures::StreamExt;
1048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049    /// # let tick = process.tick();
1050    /// # // ticks are lazy by default, forces the second tick to run
1051    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052    /// # let batch_first_tick = process
1053    /// #   .source_iter(q!(vec![]))
1054    /// #   .batch(&tick, nondet!(/** test */));
1055    /// # let batch_second_tick = process
1056    /// #   .source_iter(q!(vec![1, 2, 3]))
1057    /// #   .batch(&tick, nondet!(/** test */))
1058    /// #   .defer_tick(); // appears on the second tick
1059    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1060    /// input_batch // first tick: [], second tick: [1, 2, 3]
1061    ///     .max()
1062    ///     .all_ticks()
1063    /// # }, |mut stream| async move {
1064    /// // [3]
1065    /// # for w in vec![3] {
1066    /// #     assert_eq!(stream.next().await.unwrap(), w);
1067    /// # }
1068    /// # }));
1069    /// # }
1070    /// ```
1071    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1072        self.into_stream().all_ticks()
1073    }
1074
1075    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1076    /// which will stream the value computed in _each_ tick as a separate stream element.
1077    ///
1078    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1079    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1080    /// optional's [`Tick`] context.
1081    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1082        self.into_stream().all_ticks_atomic()
1083    }
1084
1085    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1086    /// be asynchronously updated with the latest value of the optional inside the tick, including
1087    /// whether the optional is null or not.
1088    ///
1089    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1090    /// tick that tracks the inner value. This is useful for getting the value as of the
1091    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1092    ///
1093    /// # Example
1094    /// ```rust
1095    /// # #[cfg(feature = "deploy")] {
1096    /// # use hydro_lang::prelude::*;
1097    /// # use futures::StreamExt;
1098    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099    /// # let tick = process.tick();
1100    /// # // ticks are lazy by default, forces the second tick to run
1101    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1102    /// # let batch_first_tick = process
1103    /// #   .source_iter(q!(vec![]))
1104    /// #   .batch(&tick, nondet!(/** test */));
1105    /// # let batch_second_tick = process
1106    /// #   .source_iter(q!(vec![1, 2, 3]))
1107    /// #   .batch(&tick, nondet!(/** test */))
1108    /// #   .defer_tick(); // appears on the second tick
1109    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1110    /// input_batch // first tick: [], second tick: [1, 2, 3]
1111    ///     .max()
1112    ///     .latest()
1113    /// # .into_singleton()
1114    /// # .sample_eager(nondet!(/** test */))
1115    /// # }, |mut stream| async move {
1116    /// // asynchronously changes from None ~> 3
1117    /// # for w in vec![None, Some(3)] {
1118    /// #     assert_eq!(stream.next().await.unwrap(), w);
1119    /// # }
1120    /// # }));
1121    /// # }
1122    /// ```
1123    pub fn latest(self) -> Optional<T, L, Unbounded> {
1124        Optional::new(
1125            self.location.outer().clone(),
1126            HydroNode::YieldConcat {
1127                inner: Box::new(self.ir_node.into_inner()),
1128                metadata: self
1129                    .location
1130                    .outer()
1131                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1132            },
1133        )
1134    }
1135
1136    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1137    /// be updated with the latest value of the optional inside the tick.
1138    ///
1139    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1140    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1141    /// optional's [`Tick`] context.
1142    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1143        let out_location = Atomic {
1144            tick: self.location.clone(),
1145        };
1146
1147        Optional::new(
1148            out_location.clone(),
1149            HydroNode::YieldConcat {
1150                inner: Box::new(self.ir_node.into_inner()),
1151                metadata: out_location
1152                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1153            },
1154        )
1155    }
1156
1157    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1158    /// always has the state of `self` at tick `T - 1`.
1159    ///
1160    /// At tick `0`, the output optional is null, since there is no previous tick.
1161    ///
1162    /// This operator enables stateful iterative processing with ticks, by sending data from one
1163    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1164    ///
1165    /// # Example
1166    /// ```rust
1167    /// # #[cfg(feature = "deploy")] {
1168    /// # use hydro_lang::prelude::*;
1169    /// # use futures::StreamExt;
1170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171    /// let tick = process.tick();
1172    /// // ticks are lazy by default, forces the second tick to run
1173    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1174    ///
1175    /// let batch_first_tick = process
1176    ///   .source_iter(q!(vec![1, 2]))
1177    ///   .batch(&tick, nondet!(/** test */));
1178    /// let batch_second_tick = process
1179    ///   .source_iter(q!(vec![3, 4]))
1180    ///   .batch(&tick, nondet!(/** test */))
1181    ///   .defer_tick(); // appears on the second tick
1182    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1183    ///   .reduce(q!(|state, v| *state += v));
1184    ///
1185    /// current_tick_sum.clone().into_singleton().zip(
1186    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1187    /// ).all_ticks()
1188    /// # }, |mut stream| async move {
1189    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1190    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1191    /// #     assert_eq!(stream.next().await.unwrap(), w);
1192    /// # }
1193    /// # }));
1194    /// # }
1195    /// ```
1196    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1197        Optional::new(
1198            self.location.clone(),
1199            HydroNode::DeferTick {
1200                input: Box::new(self.ir_node.into_inner()),
1201                metadata: self.location.new_node_metadata(Self::collection_kind()),
1202            },
1203        )
1204    }
1205
1206    #[deprecated(note = "use .into_stream().persist()")]
1207    #[expect(missing_docs, reason = "deprecated")]
1208    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1209    where
1210        T: Clone,
1211    {
1212        self.into_stream().persist()
1213    }
1214
1215    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1216    /// non-null. Otherwise, the stream is empty.
1217    ///
1218    /// # Example
1219    /// ```rust
1220    /// # #[cfg(feature = "deploy")] {
1221    /// # use hydro_lang::prelude::*;
1222    /// # use futures::StreamExt;
1223    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224    /// # let tick = process.tick();
1225    /// # // ticks are lazy by default, forces the second tick to run
1226    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1227    /// # let batch_first_tick = process
1228    /// #   .source_iter(q!(vec![]))
1229    /// #   .batch(&tick, nondet!(/** test */));
1230    /// # let batch_second_tick = process
1231    /// #   .source_iter(q!(vec![123, 456]))
1232    /// #   .batch(&tick, nondet!(/** test */))
1233    /// #   .defer_tick(); // appears on the second tick
1234    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1235    /// input_batch // first tick: [], second tick: [123, 456]
1236    ///     .clone()
1237    ///     .max()
1238    ///     .into_stream()
1239    ///     .chain(input_batch)
1240    ///     .all_ticks()
1241    /// # }, |mut stream| async move {
1242    /// // [456, 123, 456]
1243    /// # for w in vec![456, 123, 456] {
1244    /// #     assert_eq!(stream.next().await.unwrap(), w);
1245    /// # }
1246    /// # }));
1247    /// # }
1248    /// ```
1249    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1250        Stream::new(
1251            self.location.clone(),
1252            HydroNode::Cast {
1253                inner: Box::new(self.ir_node.into_inner()),
1254                metadata: self.location.new_node_metadata(Stream::<
1255                    T,
1256                    Tick<L>,
1257                    Bounded,
1258                    TotalOrder,
1259                    ExactlyOnce,
1260                >::collection_kind()),
1261            },
1262        )
1263    }
1264}
1265
1266#[cfg(feature = "deploy")]
1267#[cfg(test)]
1268mod tests {
1269    use futures::StreamExt;
1270    use hydro_deploy::Deployment;
1271    use stageleft::q;
1272
1273    use super::Optional;
1274    use crate::compile::builder::FlowBuilder;
1275    use crate::location::Location;
1276    use crate::nondet::nondet;
1277
1278    #[tokio::test]
1279    async fn optional_or_cardinality() {
1280        let mut deployment = Deployment::new();
1281
1282        let flow = FlowBuilder::new();
1283        let node = flow.process::<()>();
1284        let external = flow.external::<()>();
1285
1286        let node_tick = node.tick();
1287        let tick_singleton = node_tick.singleton(q!(123));
1288        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1289        let counts = tick_optional_inhabited
1290            .clone()
1291            .or(tick_optional_inhabited)
1292            .into_stream()
1293            .count()
1294            .all_ticks()
1295            .send_bincode_external(&external);
1296
1297        let nodes = flow
1298            .with_process(&node, deployment.Localhost())
1299            .with_external(&external, deployment.Localhost())
1300            .deploy(&mut deployment);
1301
1302        deployment.deploy().await.unwrap();
1303
1304        let mut external_out = nodes.connect(counts).await;
1305
1306        deployment.start().await.unwrap();
1307
1308        assert_eq!(external_out.next().await.unwrap(), 1);
1309    }
1310
1311    #[tokio::test]
1312    async fn into_singleton_top_level_none_cardinality() {
1313        let mut deployment = Deployment::new();
1314
1315        let flow = FlowBuilder::new();
1316        let node = flow.process::<()>();
1317        let external = flow.external::<()>();
1318
1319        let node_tick = node.tick();
1320        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1321        let into_singleton = top_level_none.into_singleton();
1322
1323        let tick_driver = node.spin();
1324
1325        let counts = into_singleton
1326            .snapshot(&node_tick, nondet!(/** test */))
1327            .into_stream()
1328            .count()
1329            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1330            .map(q!(|(c, _)| c))
1331            .all_ticks()
1332            .send_bincode_external(&external);
1333
1334        let nodes = flow
1335            .with_process(&node, deployment.Localhost())
1336            .with_external(&external, deployment.Localhost())
1337            .deploy(&mut deployment);
1338
1339        deployment.deploy().await.unwrap();
1340
1341        let mut external_out = nodes.connect(counts).await;
1342
1343        deployment.start().await.unwrap();
1344
1345        assert_eq!(external_out.next().await.unwrap(), 1);
1346        assert_eq!(external_out.next().await.unwrap(), 1);
1347        assert_eq!(external_out.next().await.unwrap(), 1);
1348    }
1349}