hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
45where
46    T: Clone,
47    L: Location<'a> + NoTick,
48{
49    fn from(value: Optional<T, L, Bounded>) -> Self {
50        let tick = value.location().tick();
51        value.clone_into_tick(&tick).latest()
52    }
53}
54
55impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
56where
57    L: Location<'a>,
58{
59    fn defer_tick(self) -> Self {
60        Optional::defer_tick(self)
61    }
62}
63
64impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
65where
66    L: Location<'a>,
67{
68    type Location = Tick<L>;
69
70    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
71        Optional::new(
72            location.clone(),
73            HydroNode::CycleSource {
74                ident,
75                metadata: location.new_node_metadata(Self::collection_kind()),
76            },
77        )
78    }
79}
80
81impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
82where
83    L: Location<'a>,
84{
85    type Location = Tick<L>;
86
87    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
88        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
89            location.clone(),
90            HydroNode::DeferTick {
91                input: Box::new(HydroNode::CycleSource {
92                    ident,
93                    metadata: location.new_node_metadata(Self::collection_kind()),
94                }),
95                metadata: location
96                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
97            },
98        );
99
100        from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
101    }
102}
103
104impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
105where
106    L: Location<'a>,
107{
108    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
109        assert_eq!(
110            Location::id(&self.location),
111            expected_location,
112            "locations do not match"
113        );
114        self.location
115            .flow_state()
116            .borrow_mut()
117            .push_root(HydroRoot::CycleSink {
118                ident,
119                input: Box::new(self.ir_node.into_inner()),
120                op_metadata: HydroIrOpMetadata::new(),
121            });
122    }
123}
124
125impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
126where
127    L: Location<'a>,
128{
129    type Location = Tick<L>;
130
131    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
132        Optional::new(
133            location.clone(),
134            HydroNode::CycleSource {
135                ident,
136                metadata: location.new_node_metadata(Self::collection_kind()),
137            },
138        )
139    }
140}
141
142impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
143where
144    L: Location<'a>,
145{
146    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
147        assert_eq!(
148            Location::id(&self.location),
149            expected_location,
150            "locations do not match"
151        );
152        self.location
153            .flow_state()
154            .borrow_mut()
155            .push_root(HydroRoot::CycleSink {
156                ident,
157                input: Box::new(self.ir_node.into_inner()),
158                op_metadata: HydroIrOpMetadata::new(),
159            });
160    }
161}
162
163impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
164where
165    L: Location<'a> + NoTick,
166{
167    type Location = L;
168
169    fn create_source(ident: syn::Ident, location: L) -> Self {
170        Optional::new(
171            location.clone(),
172            HydroNode::CycleSource {
173                ident,
174                metadata: location.new_node_metadata(Self::collection_kind()),
175            },
176        )
177    }
178}
179
180impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
181where
182    L: Location<'a> + NoTick,
183{
184    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
185        assert_eq!(
186            Location::id(&self.location),
187            expected_location,
188            "locations do not match"
189        );
190        self.location
191            .flow_state()
192            .borrow_mut()
193            .push_root(HydroRoot::CycleSink {
194                ident,
195                input: Box::new(self.ir_node.into_inner()),
196                op_metadata: HydroIrOpMetadata::new(),
197            });
198    }
199}
200
201impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
202where
203    L: Location<'a>,
204{
205    fn from(singleton: Singleton<T, L, B>) -> Self {
206        Optional::new(
207            singleton.location.clone(),
208            HydroNode::Cast {
209                inner: Box::new(singleton.ir_node.into_inner()),
210                metadata: singleton
211                    .location
212                    .new_node_metadata(Self::collection_kind()),
213            },
214        )
215    }
216}
217
218#[cfg(stageleft_runtime)]
219fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
220    me: Optional<T, L, B>,
221    other: Optional<O, L, B>,
222) -> Optional<(T, O), L, B> {
223    check_matching_location(&me.location, &other.location);
224
225    Optional::new(
226        me.location.clone(),
227        HydroNode::CrossSingleton {
228            left: Box::new(me.ir_node.into_inner()),
229            right: Box::new(other.ir_node.into_inner()),
230            metadata: me
231                .location
232                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
233        },
234    )
235}
236
237#[cfg(stageleft_runtime)]
238fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
239    me: Optional<T, L, B>,
240    other: Optional<T, L, B>,
241) -> Optional<T, L, B> {
242    check_matching_location(&me.location, &other.location);
243
244    Optional::new(
245        me.location.clone(),
246        HydroNode::ChainFirst {
247            first: Box::new(me.ir_node.into_inner()),
248            second: Box::new(other.ir_node.into_inner()),
249            metadata: me
250                .location
251                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
252        },
253    )
254}
255
256impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
257where
258    T: Clone,
259    L: Location<'a>,
260{
261    fn clone(&self) -> Self {
262        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
263            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
264            *self.ir_node.borrow_mut() = HydroNode::Tee {
265                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
266                metadata: self.location.new_node_metadata(Self::collection_kind()),
267            };
268        }
269
270        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
271            Optional {
272                location: self.location.clone(),
273                ir_node: HydroNode::Tee {
274                    inner: TeeNode(inner.0.clone()),
275                    metadata: metadata.clone(),
276                }
277                .into(),
278                _phantom: PhantomData,
279            }
280        } else {
281            unreachable!()
282        }
283    }
284}
285
286impl<'a, T, L, B: Boundedness> Optional<T, L, B>
287where
288    L: Location<'a>,
289{
290    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
291        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
292        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
293        Optional {
294            location,
295            ir_node: RefCell::new(ir_node),
296            _phantom: PhantomData,
297        }
298    }
299
300    pub(crate) fn collection_kind() -> CollectionKind {
301        CollectionKind::Optional {
302            bound: B::BOUND_KIND,
303            element_type: stageleft::quote_type::<T>().into(),
304        }
305    }
306
307    /// Returns the [`Location`] where this optional is being materialized.
308    pub fn location(&self) -> &L {
309        &self.location
310    }
311
312    /// Transforms the optional value by applying a function `f` to it,
313    /// continuously as the input is updated.
314    ///
315    /// Whenever the optional is empty, the output optional is also empty.
316    ///
317    /// # Example
318    /// ```rust
319    /// # #[cfg(feature = "deploy")] {
320    /// # use hydro_lang::prelude::*;
321    /// # use futures::StreamExt;
322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
323    /// let tick = process.tick();
324    /// let optional = tick.optional_first_tick(q!(1));
325    /// optional.map(q!(|v| v + 1)).all_ticks()
326    /// # }, |mut stream| async move {
327    /// // 2
328    /// # assert_eq!(stream.next().await.unwrap(), 2);
329    /// # }));
330    /// # }
331    /// ```
332    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
333    where
334        F: Fn(T) -> U + 'a,
335    {
336        let f = f.splice_fn1_ctx(&self.location).into();
337        Optional::new(
338            self.location.clone(),
339            HydroNode::Map {
340                f,
341                input: Box::new(self.ir_node.into_inner()),
342                metadata: self
343                    .location
344                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
345            },
346        )
347    }
348
349    /// Transforms the optional value by applying a function `f` to it and then flattening
350    /// the result into a stream, preserving the order of elements.
351    ///
352    /// If the optional is empty, the output stream is also empty. If the optional contains
353    /// a value, `f` is applied to produce an iterator, and all items from that iterator
354    /// are emitted in the output stream in deterministic order.
355    ///
356    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
357    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
358    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
359    ///
360    /// # Example
361    /// ```rust
362    /// # #[cfg(feature = "deploy")] {
363    /// # use hydro_lang::prelude::*;
364    /// # use futures::StreamExt;
365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
366    /// let tick = process.tick();
367    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
368    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
369    /// # }, |mut stream| async move {
370    /// // 1, 2, 3
371    /// # for w in vec![1, 2, 3] {
372    /// #     assert_eq!(stream.next().await.unwrap(), w);
373    /// # }
374    /// # }));
375    /// # }
376    /// ```
377    pub fn flat_map_ordered<U, I, F>(
378        self,
379        f: impl IntoQuotedMut<'a, F, L>,
380    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
381    where
382        I: IntoIterator<Item = U>,
383        F: Fn(T) -> I + 'a,
384    {
385        let f = f.splice_fn1_ctx(&self.location).into();
386        Stream::new(
387            self.location.clone(),
388            HydroNode::FlatMap {
389                f,
390                input: Box::new(self.ir_node.into_inner()),
391                metadata: self.location.new_node_metadata(
392                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
393                ),
394            },
395        )
396    }
397
398    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
399    /// for the output type `I` to produce items in any order.
400    ///
401    /// If the optional is empty, the output stream is also empty. If the optional contains
402    /// a value, `f` is applied to produce an iterator, and all items from that iterator
403    /// are emitted in the output stream in non-deterministic order.
404    ///
405    /// # Example
406    /// ```rust
407    /// # #[cfg(feature = "deploy")] {
408    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
409    /// # use futures::StreamExt;
410    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
411    /// let tick = process.tick();
412    /// let optional = tick.optional_first_tick(q!(
413    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
414    /// ));
415    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
416    /// # }, |mut stream| async move {
417    /// // 1, 2, 3, but in no particular order
418    /// # let mut results = Vec::new();
419    /// # for _ in 0..3 {
420    /// #     results.push(stream.next().await.unwrap());
421    /// # }
422    /// # results.sort();
423    /// # assert_eq!(results, vec![1, 2, 3]);
424    /// # }));
425    /// # }
426    /// ```
427    pub fn flat_map_unordered<U, I, F>(
428        self,
429        f: impl IntoQuotedMut<'a, F, L>,
430    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
431    where
432        I: IntoIterator<Item = U>,
433        F: Fn(T) -> I + 'a,
434    {
435        let f = f.splice_fn1_ctx(&self.location).into();
436        Stream::new(
437            self.location.clone(),
438            HydroNode::FlatMap {
439                f,
440                input: Box::new(self.ir_node.into_inner()),
441                metadata: self
442                    .location
443                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
444            },
445        )
446    }
447
448    /// Flattens the optional value into a stream, preserving the order of elements.
449    ///
450    /// If the optional is empty, the output stream is also empty. If the optional contains
451    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
452    /// in the output stream in deterministic order.
453    ///
454    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
455    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
456    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
457    ///
458    /// # Example
459    /// ```rust
460    /// # #[cfg(feature = "deploy")] {
461    /// # use hydro_lang::prelude::*;
462    /// # use futures::StreamExt;
463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464    /// let tick = process.tick();
465    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
466    /// optional.flatten_ordered().all_ticks()
467    /// # }, |mut stream| async move {
468    /// // 1, 2, 3
469    /// # for w in vec![1, 2, 3] {
470    /// #     assert_eq!(stream.next().await.unwrap(), w);
471    /// # }
472    /// # }));
473    /// # }
474    /// ```
475    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
476    where
477        T: IntoIterator<Item = U>,
478    {
479        self.flat_map_ordered(q!(|v| v))
480    }
481
482    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
483    /// for the element type `T` to produce items in any order.
484    ///
485    /// If the optional is empty, the output stream is also empty. If the optional contains
486    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
487    /// in the output stream in non-deterministic order.
488    ///
489    /// # Example
490    /// ```rust
491    /// # #[cfg(feature = "deploy")] {
492    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
495    /// let tick = process.tick();
496    /// let optional = tick.optional_first_tick(q!(
497    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
498    /// ));
499    /// optional.flatten_unordered().all_ticks()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, but in no particular order
502    /// # let mut results = Vec::new();
503    /// # for _ in 0..3 {
504    /// #     results.push(stream.next().await.unwrap());
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec![1, 2, 3]);
508    /// # }));
509    /// # }
510    /// ```
511    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
512    where
513        T: IntoIterator<Item = U>,
514    {
515        self.flat_map_unordered(q!(|v| v))
516    }
517
518    /// Creates an optional containing only the value if it satisfies a predicate `f`.
519    ///
520    /// If the optional is empty, the output optional is also empty. If the optional contains
521    /// a value and the predicate returns `true`, the output optional contains the same value.
522    /// If the predicate returns `false`, the output optional is empty.
523    ///
524    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
525    /// not modify or take ownership of the value. If you need to modify the value while filtering
526    /// use [`Optional::filter_map`] instead.
527    ///
528    /// # Example
529    /// ```rust
530    /// # #[cfg(feature = "deploy")] {
531    /// # use hydro_lang::prelude::*;
532    /// # use futures::StreamExt;
533    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
534    /// let tick = process.tick();
535    /// let optional = tick.optional_first_tick(q!(5));
536    /// optional.filter(q!(|&x| x > 3)).all_ticks()
537    /// # }, |mut stream| async move {
538    /// // 5
539    /// # assert_eq!(stream.next().await.unwrap(), 5);
540    /// # }));
541    /// # }
542    /// ```
543    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
544    where
545        F: Fn(&T) -> bool + 'a,
546    {
547        let f = f.splice_fn1_borrow_ctx(&self.location).into();
548        Optional::new(
549            self.location.clone(),
550            HydroNode::Filter {
551                f,
552                input: Box::new(self.ir_node.into_inner()),
553                metadata: self.location.new_node_metadata(Self::collection_kind()),
554            },
555        )
556    }
557
558    /// An operator that both filters and maps. It yields only the value if the supplied
559    /// closure `f` returns `Some(value)`.
560    ///
561    /// If the optional is empty, the output optional is also empty. If the optional contains
562    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
563    /// If the closure returns `None`, the output optional is empty.
564    ///
565    /// # Example
566    /// ```rust
567    /// # #[cfg(feature = "deploy")] {
568    /// # use hydro_lang::prelude::*;
569    /// # use futures::StreamExt;
570    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
571    /// let tick = process.tick();
572    /// let optional = tick.optional_first_tick(q!("42"));
573    /// optional
574    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
575    ///     .all_ticks()
576    /// # }, |mut stream| async move {
577    /// // 42
578    /// # assert_eq!(stream.next().await.unwrap(), 42);
579    /// # }));
580    /// # }
581    /// ```
582    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
583    where
584        F: Fn(T) -> Option<U> + 'a,
585    {
586        let f = f.splice_fn1_ctx(&self.location).into();
587        Optional::new(
588            self.location.clone(),
589            HydroNode::FilterMap {
590                f,
591                input: Box::new(self.ir_node.into_inner()),
592                metadata: self
593                    .location
594                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
595            },
596        )
597    }
598
599    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
600    ///
601    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
602    /// non-null. This is useful for combining several pieces of state together.
603    ///
604    /// # Example
605    /// ```rust
606    /// # #[cfg(feature = "deploy")] {
607    /// # use hydro_lang::prelude::*;
608    /// # use futures::StreamExt;
609    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610    /// let tick = process.tick();
611    /// let numbers = process
612    ///   .source_iter(q!(vec![123, 456, 789]))
613    ///   .batch(&tick, nondet!(/** test */));
614    /// let min = numbers.clone().min(); // Optional
615    /// let max = numbers.max(); // Optional
616    /// min.zip(max).all_ticks()
617    /// # }, |mut stream| async move {
618    /// // [(123, 789)]
619    /// # for w in vec![(123, 789)] {
620    /// #     assert_eq!(stream.next().await.unwrap(), w);
621    /// # }
622    /// # }));
623    /// # }
624    /// ```
625    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
626    where
627        O: Clone,
628    {
629        let other: Optional<O, L, B> = other.into();
630        check_matching_location(&self.location, &other.location);
631
632        if L::is_top_level()
633            && let Some(tick) = self.location.try_tick()
634        {
635            let out = zip_inside_tick(
636                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
637                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638            )
639            .latest();
640
641            Optional::new(out.location, out.ir_node.into_inner())
642        } else {
643            zip_inside_tick(self, other)
644        }
645    }
646
647    /// Passes through `self` when it has a value, otherwise passes through `other`.
648    ///
649    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
650    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
651    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
652    ///
653    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
654    /// of the inputs change (including to/from null states).
655    ///
656    /// # Example
657    /// ```rust
658    /// # #[cfg(feature = "deploy")] {
659    /// # use hydro_lang::prelude::*;
660    /// # use futures::StreamExt;
661    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
662    /// let tick = process.tick();
663    /// // ticks are lazy by default, forces the second tick to run
664    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
665    ///
666    /// let some_first_tick = tick.optional_first_tick(q!(123));
667    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
668    /// some_first_tick.or(some_second_tick).all_ticks()
669    /// # }, |mut stream| async move {
670    /// // [123 /* first tick */, 456 /* second tick */]
671    /// # for w in vec![123, 456] {
672    /// #     assert_eq!(stream.next().await.unwrap(), w);
673    /// # }
674    /// # }));
675    /// # }
676    /// ```
677    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
678        check_matching_location(&self.location, &other.location);
679
680        if L::is_top_level()
681            && !B::BOUNDED // only if unbounded we need to use a tick
682            && let Some(tick) = self.location.try_tick()
683        {
684            let out = or_inside_tick(
685                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
686                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
687            )
688            .latest();
689
690            Optional::new(out.location, out.ir_node.into_inner())
691        } else {
692            Optional::new(
693                self.location.clone(),
694                HydroNode::ChainFirst {
695                    first: Box::new(self.ir_node.into_inner()),
696                    second: Box::new(other.ir_node.into_inner()),
697                    metadata: self.location.new_node_metadata(Self::collection_kind()),
698                },
699            )
700        }
701    }
702
703    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
704    ///
705    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
706    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
707    ///
708    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
709    /// of the inputs change (including to/from null states).
710    ///
711    /// # Example
712    /// ```rust
713    /// # #[cfg(feature = "deploy")] {
714    /// # use hydro_lang::prelude::*;
715    /// # use futures::StreamExt;
716    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717    /// let tick = process.tick();
718    /// // ticks are lazy by default, forces the later ticks to run
719    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
720    ///
721    /// let some_first_tick = tick.optional_first_tick(q!(123));
722    /// some_first_tick
723    ///     .unwrap_or(tick.singleton(q!(456)))
724    ///     .all_ticks()
725    /// # }, |mut stream| async move {
726    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
727    /// # for w in vec![123, 456, 456, 456] {
728    /// #     assert_eq!(stream.next().await.unwrap(), w);
729    /// # }
730    /// # }));
731    /// # }
732    /// ```
733    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
734        let res_option = self.or(other.into());
735        Singleton::new(
736            res_option.location.clone(),
737            HydroNode::Cast {
738                inner: Box::new(res_option.ir_node.into_inner()),
739                metadata: res_option
740                    .location
741                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
742            },
743        )
744    }
745
746    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
747    ///
748    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
749    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
750    /// so that Hydro can skip any computation on null values.
751    ///
752    /// # Example
753    /// ```rust
754    /// # #[cfg(feature = "deploy")] {
755    /// # use hydro_lang::prelude::*;
756    /// # use futures::StreamExt;
757    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
758    /// let tick = process.tick();
759    /// // ticks are lazy by default, forces the later ticks to run
760    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
761    ///
762    /// let some_first_tick = tick.optional_first_tick(q!(123));
763    /// some_first_tick.into_singleton().all_ticks()
764    /// # }, |mut stream| async move {
765    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
766    /// # for w in vec![Some(123), None, None, None] {
767    /// #     assert_eq!(stream.next().await.unwrap(), w);
768    /// # }
769    /// # }));
770    /// # }
771    /// ```
772    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
773    where
774        T: Clone,
775    {
776        let none: syn::Expr = parse_quote!(::std::option::Option::None);
777
778        let none_singleton = Singleton::new(
779            self.location.clone(),
780            HydroNode::SingletonSource {
781                value: none.into(),
782                metadata: self
783                    .location
784                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
785            },
786        );
787
788        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
789    }
790
791    /// An operator which allows you to "name" a `HydroNode`.
792    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
793    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
794        {
795            let mut node = self.ir_node.borrow_mut();
796            let metadata = node.metadata_mut();
797            metadata.tag = Some(name.to_string());
798        }
799        self
800    }
801}
802
803impl<'a, T, L> Optional<T, L, Bounded>
804where
805    L: Location<'a>,
806{
807    /// Clones this bounded optional into a tick, returning a optional that has the
808    /// same value as the outer optional. Because the outer optional is bounded, this
809    /// is deterministic because there is only a single immutable version.
810    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
811    where
812        T: Clone,
813    {
814        // TODO(shadaj): avoid printing simulator logs for this snapshot
815        self.snapshot(
816            tick,
817            nondet!(/** bounded top-level optional so deterministic */),
818        )
819    }
820
821    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
822    /// non-null. Otherwise, the stream is empty.
823    ///
824    /// # Example
825    /// ```rust
826    /// # #[cfg(feature = "deploy")] {
827    /// # use hydro_lang::prelude::*;
828    /// # use futures::StreamExt;
829    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
830    /// # let tick = process.tick();
831    /// # // ticks are lazy by default, forces the second tick to run
832    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
833    /// # let batch_first_tick = process
834    /// #   .source_iter(q!(vec![]))
835    /// #   .batch(&tick, nondet!(/** test */));
836    /// # let batch_second_tick = process
837    /// #   .source_iter(q!(vec![123, 456]))
838    /// #   .batch(&tick, nondet!(/** test */))
839    /// #   .defer_tick(); // appears on the second tick
840    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
841    /// input_batch // first tick: [], second tick: [123, 456]
842    ///     .clone()
843    ///     .max()
844    ///     .into_stream()
845    ///     .chain(input_batch)
846    ///     .all_ticks()
847    /// # }, |mut stream| async move {
848    /// // [456, 123, 456]
849    /// # for w in vec![456, 123, 456] {
850    /// #     assert_eq!(stream.next().await.unwrap(), w);
851    /// # }
852    /// # }));
853    /// # }
854    /// ```
855    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
856        Stream::new(
857            self.location.clone(),
858            HydroNode::Cast {
859                inner: Box::new(self.ir_node.into_inner()),
860                metadata: self.location.new_node_metadata(Stream::<
861                    T,
862                    Tick<L>,
863                    Bounded,
864                    TotalOrder,
865                    ExactlyOnce,
866                >::collection_kind()),
867            },
868        )
869    }
870
871    /// Filters this optional, passing through the optional value if it is non-null **and** the
872    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
873    ///
874    /// Useful for conditionally processing, such as only emitting an optional's value outside
875    /// a tick if some other condition is satisfied.
876    ///
877    /// # Example
878    /// ```rust
879    /// # #[cfg(feature = "deploy")] {
880    /// # use hydro_lang::prelude::*;
881    /// # use futures::StreamExt;
882    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
883    /// let tick = process.tick();
884    /// // ticks are lazy by default, forces the second tick to run
885    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
886    ///
887    /// let batch_first_tick = process
888    ///   .source_iter(q!(vec![]))
889    ///   .batch(&tick, nondet!(/** test */));
890    /// let batch_second_tick = process
891    ///   .source_iter(q!(vec![456]))
892    ///   .batch(&tick, nondet!(/** test */))
893    ///   .defer_tick(); // appears on the second tick
894    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
895    /// batch_first_tick.chain(batch_second_tick).first()
896    ///   .filter_if_some(some_on_first_tick)
897    ///   .unwrap_or(tick.singleton(q!(789)))
898    ///   .all_ticks()
899    /// # }, |mut stream| async move {
900    /// // [789, 789]
901    /// # for w in vec![789, 789] {
902    /// #     assert_eq!(stream.next().await.unwrap(), w);
903    /// # }
904    /// # }));
905    /// # }
906    /// ```
907    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
908        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
909    }
910
911    /// Filters this optional, passing through the optional value if it is non-null **and** the
912    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
913    ///
914    /// Useful for conditionally processing, such as only emitting an optional's value outside
915    /// a tick if some other condition is satisfied.
916    ///
917    /// # Example
918    /// ```rust
919    /// # #[cfg(feature = "deploy")] {
920    /// # use hydro_lang::prelude::*;
921    /// # use futures::StreamExt;
922    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
923    /// let tick = process.tick();
924    /// // ticks are lazy by default, forces the second tick to run
925    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
926    ///
927    /// let batch_first_tick = process
928    ///   .source_iter(q!(vec![]))
929    ///   .batch(&tick, nondet!(/** test */));
930    /// let batch_second_tick = process
931    ///   .source_iter(q!(vec![456]))
932    ///   .batch(&tick, nondet!(/** test */))
933    ///   .defer_tick(); // appears on the second tick
934    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
935    /// batch_first_tick.chain(batch_second_tick).first()
936    ///   .filter_if_none(some_on_first_tick)
937    ///   .unwrap_or(tick.singleton(q!(789)))
938    ///   .all_ticks()
939    /// # }, |mut stream| async move {
940    /// // [789, 789]
941    /// # for w in vec![789, 456] {
942    /// #     assert_eq!(stream.next().await.unwrap(), w);
943    /// # }
944    /// # }));
945    /// # }
946    /// ```
947    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
948        self.filter_if_some(
949            other
950                .map(q!(|_| ()))
951                .into_singleton()
952                .filter(q!(|o| o.is_none())),
953        )
954    }
955
956    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
957    ///
958    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
959    /// having a value, such as only releasing a piece of state if the node is the leader.
960    ///
961    /// # Example
962    /// ```rust
963    /// # #[cfg(feature = "deploy")] {
964    /// # use hydro_lang::prelude::*;
965    /// # use futures::StreamExt;
966    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
967    /// let tick = process.tick();
968    /// // ticks are lazy by default, forces the second tick to run
969    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
970    ///
971    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
972    /// some_on_first_tick
973    ///     .if_some_then(tick.singleton(q!(456)))
974    ///     .unwrap_or(tick.singleton(q!(123)))
975    /// # .all_ticks()
976    /// # }, |mut stream| async move {
977    /// // 456 (first tick) ~> 123 (second tick onwards)
978    /// # for w in vec![456, 123, 123] {
979    /// #     assert_eq!(stream.next().await.unwrap(), w);
980    /// # }
981    /// # }));
982    /// # }
983    /// ```
984    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
985        value.filter_if_some(self)
986    }
987}
988
989impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
990where
991    L: Location<'a> + NoTick,
992{
993    /// Returns an optional value corresponding to the latest snapshot of the optional
994    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
995    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
996    /// all snapshots of this optional into the atomic-associated tick will observe the
997    /// same value each tick.
998    ///
999    /// # Non-Determinism
1000    /// Because this picks a snapshot of a optional whose value is continuously changing,
1001    /// the output optional has a non-deterministic value since the snapshot can be at an
1002    /// arbitrary point in time.
1003    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1004        Optional::new(
1005            self.location.clone().tick,
1006            HydroNode::Batch {
1007                inner: Box::new(self.ir_node.into_inner()),
1008                metadata: self
1009                    .location
1010                    .tick
1011                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1012            },
1013        )
1014    }
1015
1016    /// Returns this optional back into a top-level, asynchronous execution context where updates
1017    /// to the value will be asynchronously propagated.
1018    pub fn end_atomic(self) -> Optional<T, L, B> {
1019        Optional::new(
1020            self.location.tick.l.clone(),
1021            HydroNode::EndAtomic {
1022                inner: Box::new(self.ir_node.into_inner()),
1023                metadata: self
1024                    .location
1025                    .tick
1026                    .l
1027                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1028            },
1029        )
1030    }
1031}
1032
1033impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1034where
1035    L: Location<'a>,
1036{
1037    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1038    /// will observe the same version of the value and will be executed synchronously before any
1039    /// outputs are yielded (in [`Optional::end_atomic`]).
1040    ///
1041    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1042    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1043    /// a different version).
1044    ///
1045    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1046    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1047    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1048    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1049        let out_location = Atomic { tick: tick.clone() };
1050        Optional::new(
1051            out_location.clone(),
1052            HydroNode::BeginAtomic {
1053                inner: Box::new(self.ir_node.into_inner()),
1054                metadata: out_location
1055                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1056            },
1057        )
1058    }
1059
1060    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1061    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1062    /// relevant data that contributed to the snapshot at tick `t`.
1063    ///
1064    /// # Non-Determinism
1065    /// Because this picks a snapshot of a optional whose value is continuously changing,
1066    /// the output optional has a non-deterministic value since the snapshot can be at an
1067    /// arbitrary point in time.
1068    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1069        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1070        Optional::new(
1071            tick.clone(),
1072            HydroNode::Batch {
1073                inner: Box::new(self.ir_node.into_inner()),
1074                metadata: tick
1075                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1076            },
1077        )
1078    }
1079
1080    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1081    /// with order corresponding to increasing prefixes of data contributing to the optional.
1082    ///
1083    /// # Non-Determinism
1084    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1085    /// to non-deterministic batching and arrival of inputs, the output stream is
1086    /// non-deterministic.
1087    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1088    where
1089        L: NoTick,
1090    {
1091        let tick = self.location.tick();
1092        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1093    }
1094
1095    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1096    /// value taken at various points in time. Because the input optional may be
1097    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1098    /// represent the value of the optional given some prefix of the streams leading up to
1099    /// it.
1100    ///
1101    /// # Non-Determinism
1102    /// The output stream is non-deterministic in which elements are sampled, since this
1103    /// is controlled by a clock.
1104    pub fn sample_every(
1105        self,
1106        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1107        nondet: NonDet,
1108    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1109    where
1110        L: NoTick + NoAtomic,
1111    {
1112        let samples = self.location.source_interval(interval, nondet);
1113        let tick = self.location.tick();
1114
1115        self.snapshot(&tick, nondet)
1116            .filter_if_some(samples.batch(&tick, nondet).first())
1117            .all_ticks()
1118            .weakest_retries()
1119    }
1120}
1121
1122impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1123where
1124    L: Location<'a>,
1125{
1126    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1127    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1128    /// null values).
1129    ///
1130    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1131    /// producing one element in the output for each (non-null) tick. This is useful for batched
1132    /// computations, where the results from each tick must be combined together.
1133    ///
1134    /// # Example
1135    /// ```rust
1136    /// # #[cfg(feature = "deploy")] {
1137    /// # use hydro_lang::prelude::*;
1138    /// # use futures::StreamExt;
1139    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1140    /// # let tick = process.tick();
1141    /// # // ticks are lazy by default, forces the second tick to run
1142    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1143    /// # let batch_first_tick = process
1144    /// #   .source_iter(q!(vec![]))
1145    /// #   .batch(&tick, nondet!(/** test */));
1146    /// # let batch_second_tick = process
1147    /// #   .source_iter(q!(vec![1, 2, 3]))
1148    /// #   .batch(&tick, nondet!(/** test */))
1149    /// #   .defer_tick(); // appears on the second tick
1150    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1151    /// input_batch // first tick: [], second tick: [1, 2, 3]
1152    ///     .max()
1153    ///     .all_ticks()
1154    /// # }, |mut stream| async move {
1155    /// // [3]
1156    /// # for w in vec![3] {
1157    /// #     assert_eq!(stream.next().await.unwrap(), w);
1158    /// # }
1159    /// # }));
1160    /// # }
1161    /// ```
1162    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1163        self.into_stream().all_ticks()
1164    }
1165
1166    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1167    /// which will stream the value computed in _each_ tick as a separate stream element.
1168    ///
1169    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1170    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1171    /// optional's [`Tick`] context.
1172    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1173        self.into_stream().all_ticks_atomic()
1174    }
1175
1176    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1177    /// be asynchronously updated with the latest value of the optional inside the tick, including
1178    /// whether the optional is null or not.
1179    ///
1180    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1181    /// tick that tracks the inner value. This is useful for getting the value as of the
1182    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1183    ///
1184    /// # Example
1185    /// ```rust
1186    /// # #[cfg(feature = "deploy")] {
1187    /// # use hydro_lang::prelude::*;
1188    /// # use futures::StreamExt;
1189    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1190    /// # let tick = process.tick();
1191    /// # // ticks are lazy by default, forces the second tick to run
1192    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1193    /// # let batch_first_tick = process
1194    /// #   .source_iter(q!(vec![]))
1195    /// #   .batch(&tick, nondet!(/** test */));
1196    /// # let batch_second_tick = process
1197    /// #   .source_iter(q!(vec![1, 2, 3]))
1198    /// #   .batch(&tick, nondet!(/** test */))
1199    /// #   .defer_tick(); // appears on the second tick
1200    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1201    /// input_batch // first tick: [], second tick: [1, 2, 3]
1202    ///     .max()
1203    ///     .latest()
1204    /// # .into_singleton()
1205    /// # .sample_eager(nondet!(/** test */))
1206    /// # }, |mut stream| async move {
1207    /// // asynchronously changes from None ~> 3
1208    /// # for w in vec![None, Some(3)] {
1209    /// #     assert_eq!(stream.next().await.unwrap(), w);
1210    /// # }
1211    /// # }));
1212    /// # }
1213    /// ```
1214    pub fn latest(self) -> Optional<T, L, Unbounded> {
1215        Optional::new(
1216            self.location.outer().clone(),
1217            HydroNode::YieldConcat {
1218                inner: Box::new(self.ir_node.into_inner()),
1219                metadata: self
1220                    .location
1221                    .outer()
1222                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1223            },
1224        )
1225    }
1226
1227    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1228    /// be updated with the latest value of the optional inside the tick.
1229    ///
1230    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1231    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1232    /// optional's [`Tick`] context.
1233    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1234        let out_location = Atomic {
1235            tick: self.location.clone(),
1236        };
1237
1238        Optional::new(
1239            out_location.clone(),
1240            HydroNode::YieldConcat {
1241                inner: Box::new(self.ir_node.into_inner()),
1242                metadata: out_location
1243                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1244            },
1245        )
1246    }
1247
1248    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1249    /// always has the state of `self` at tick `T - 1`.
1250    ///
1251    /// At tick `0`, the output optional is null, since there is no previous tick.
1252    ///
1253    /// This operator enables stateful iterative processing with ticks, by sending data from one
1254    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1255    ///
1256    /// # Example
1257    /// ```rust
1258    /// # #[cfg(feature = "deploy")] {
1259    /// # use hydro_lang::prelude::*;
1260    /// # use futures::StreamExt;
1261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262    /// let tick = process.tick();
1263    /// // ticks are lazy by default, forces the second tick to run
1264    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1265    ///
1266    /// let batch_first_tick = process
1267    ///   .source_iter(q!(vec![1, 2]))
1268    ///   .batch(&tick, nondet!(/** test */));
1269    /// let batch_second_tick = process
1270    ///   .source_iter(q!(vec![3, 4]))
1271    ///   .batch(&tick, nondet!(/** test */))
1272    ///   .defer_tick(); // appears on the second tick
1273    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1274    ///   .reduce(q!(|state, v| *state += v));
1275    ///
1276    /// current_tick_sum.clone().into_singleton().zip(
1277    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1278    /// ).all_ticks()
1279    /// # }, |mut stream| async move {
1280    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1281    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1282    /// #     assert_eq!(stream.next().await.unwrap(), w);
1283    /// # }
1284    /// # }));
1285    /// # }
1286    /// ```
1287    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1288        Optional::new(
1289            self.location.clone(),
1290            HydroNode::DeferTick {
1291                input: Box::new(self.ir_node.into_inner()),
1292                metadata: self.location.new_node_metadata(Self::collection_kind()),
1293            },
1294        )
1295    }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    #[cfg(feature = "deploy")]
1301    use futures::StreamExt;
1302    #[cfg(feature = "deploy")]
1303    use hydro_deploy::Deployment;
1304    use stageleft::q;
1305
1306    #[cfg(feature = "deploy")]
1307    use super::Optional;
1308    use crate::compile::builder::FlowBuilder;
1309    use crate::location::Location;
1310    #[cfg(feature = "deploy")]
1311    use crate::nondet::nondet;
1312
1313    #[cfg(feature = "deploy")]
1314    #[tokio::test]
1315    async fn optional_or_cardinality() {
1316        let mut deployment = Deployment::new();
1317
1318        let flow = FlowBuilder::new();
1319        let node = flow.process::<()>();
1320        let external = flow.external::<()>();
1321
1322        let node_tick = node.tick();
1323        let tick_singleton = node_tick.singleton(q!(123));
1324        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1325        let counts = tick_optional_inhabited
1326            .clone()
1327            .or(tick_optional_inhabited)
1328            .into_stream()
1329            .count()
1330            .all_ticks()
1331            .send_bincode_external(&external);
1332
1333        let nodes = flow
1334            .with_process(&node, deployment.Localhost())
1335            .with_external(&external, deployment.Localhost())
1336            .deploy(&mut deployment);
1337
1338        deployment.deploy().await.unwrap();
1339
1340        let mut external_out = nodes.connect(counts).await;
1341
1342        deployment.start().await.unwrap();
1343
1344        assert_eq!(external_out.next().await.unwrap(), 1);
1345    }
1346
1347    #[cfg(feature = "deploy")]
1348    #[tokio::test]
1349    async fn into_singleton_top_level_none_cardinality() {
1350        let mut deployment = Deployment::new();
1351
1352        let flow = FlowBuilder::new();
1353        let node = flow.process::<()>();
1354        let external = flow.external::<()>();
1355
1356        let node_tick = node.tick();
1357        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1358        let into_singleton = top_level_none.into_singleton();
1359
1360        let tick_driver = node.spin();
1361
1362        let counts = into_singleton
1363            .snapshot(&node_tick, nondet!(/** test */))
1364            .into_stream()
1365            .count()
1366            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1367            .map(q!(|(c, _)| c))
1368            .all_ticks()
1369            .send_bincode_external(&external);
1370
1371        let nodes = flow
1372            .with_process(&node, deployment.Localhost())
1373            .with_external(&external, deployment.Localhost())
1374            .deploy(&mut deployment);
1375
1376        deployment.deploy().await.unwrap();
1377
1378        let mut external_out = nodes.connect(counts).await;
1379
1380        deployment.start().await.unwrap();
1381
1382        assert_eq!(external_out.next().await.unwrap(), 1);
1383        assert_eq!(external_out.next().await.unwrap(), 1);
1384        assert_eq!(external_out.next().await.unwrap(), 1);
1385    }
1386
1387    #[cfg(feature = "deploy")]
1388    #[tokio::test]
1389    async fn into_singleton_unbounded_top_level_none_cardinality() {
1390        let mut deployment = Deployment::new();
1391
1392        let flow = FlowBuilder::new();
1393        let node = flow.process::<()>();
1394        let external = flow.external::<()>();
1395
1396        let node_tick = node.tick();
1397        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1398        let into_singleton = top_level_none.into_singleton();
1399
1400        let tick_driver = node.spin();
1401
1402        let counts = into_singleton
1403            .snapshot(&node_tick, nondet!(/** test */))
1404            .into_stream()
1405            .count()
1406            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1407            .map(q!(|(c, _)| c))
1408            .all_ticks()
1409            .send_bincode_external(&external);
1410
1411        let nodes = flow
1412            .with_process(&node, deployment.Localhost())
1413            .with_external(&external, deployment.Localhost())
1414            .deploy(&mut deployment);
1415
1416        deployment.deploy().await.unwrap();
1417
1418        let mut external_out = nodes.connect(counts).await;
1419
1420        deployment.start().await.unwrap();
1421
1422        assert_eq!(external_out.next().await.unwrap(), 1);
1423        assert_eq!(external_out.next().await.unwrap(), 1);
1424        assert_eq!(external_out.next().await.unwrap(), 1);
1425    }
1426
1427    #[cfg(feature = "sim")]
1428    #[test]
1429    fn top_level_optional_some_into_stream_no_replay() {
1430        let flow = FlowBuilder::new();
1431        let node = flow.process::<()>();
1432
1433        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1434        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1435        let filtered_some = folded.filter(q!(|_| true));
1436
1437        let out_recv = filtered_some.into_stream().sim_output();
1438
1439        flow.sim().exhaustive(async || {
1440            out_recv.assert_yields_only([10]).await;
1441        });
1442    }
1443
1444    #[cfg(feature = "sim")]
1445    #[test]
1446    fn top_level_optional_none_into_stream_no_replay() {
1447        let flow = FlowBuilder::new();
1448        let node = flow.process::<()>();
1449
1450        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1451        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1452        let filtered_none = folded.filter(q!(|_| false));
1453
1454        let out_recv = filtered_none.into_stream().sim_output();
1455
1456        flow.sim().exhaustive(async || {
1457            out_recv.assert_yields_only([] as [i32; 0]).await;
1458        });
1459    }
1460}