Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42    pub(crate) flow_state: FlowState,
43
44    _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
48    fn drop(&mut self) {
49        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52                input: Box::new(ir_node),
53                op_metadata: HydroIrOpMetadata::new(),
54            });
55        }
56    }
57}
58
59impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
60where
61    T: Clone,
62    L: Location<'a> + NoTick,
63{
64    fn from(value: Optional<T, L, Bounded>) -> Self {
65        let tick = value.location().tick();
66        value.clone_into_tick(&tick).latest()
67    }
68}
69
70impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn defer_tick(self) -> Self {
75        Optional::defer_tick(self)
76    }
77}
78
79impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
80where
81    L: Location<'a>,
82{
83    type Location = Tick<L>;
84
85    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
86        Optional::new(
87            location.clone(),
88            HydroNode::CycleSource {
89                cycle_id,
90                metadata: location.new_node_metadata(Self::collection_kind()),
91            },
92        )
93    }
94}
95
96impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
97where
98    L: Location<'a>,
99{
100    type Location = Tick<L>;
101
102    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
103        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
104            location.clone(),
105            HydroNode::DeferTick {
106                input: Box::new(HydroNode::CycleSource {
107                    cycle_id,
108                    metadata: location.new_node_metadata(Self::collection_kind()),
109                }),
110                metadata: location
111                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
112            },
113        );
114
115        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
116    }
117}
118
119impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
120where
121    L: Location<'a>,
122{
123    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
124        assert_eq!(
125            Location::id(&self.location),
126            expected_location,
127            "locations do not match"
128        );
129        self.location
130            .flow_state()
131            .borrow_mut()
132            .push_root(HydroRoot::CycleSink {
133                cycle_id,
134                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
135                op_metadata: HydroIrOpMetadata::new(),
136            });
137    }
138}
139
140impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
141where
142    L: Location<'a>,
143{
144    type Location = Tick<L>;
145
146    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
147        Optional::new(
148            location.clone(),
149            HydroNode::CycleSource {
150                cycle_id,
151                metadata: location.new_node_metadata(Self::collection_kind()),
152            },
153        )
154    }
155}
156
157impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
158where
159    L: Location<'a>,
160{
161    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
162        assert_eq!(
163            Location::id(&self.location),
164            expected_location,
165            "locations do not match"
166        );
167        self.location
168            .flow_state()
169            .borrow_mut()
170            .push_root(HydroRoot::CycleSink {
171                cycle_id,
172                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
173                op_metadata: HydroIrOpMetadata::new(),
174            });
175    }
176}
177
178impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
179where
180    L: Location<'a> + NoTick,
181{
182    type Location = L;
183
184    fn create_source(cycle_id: CycleId, location: L) -> Self {
185        Optional::new(
186            location.clone(),
187            HydroNode::CycleSource {
188                cycle_id,
189                metadata: location.new_node_metadata(Self::collection_kind()),
190            },
191        )
192    }
193}
194
195impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
196where
197    L: Location<'a> + NoTick,
198{
199    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
200        assert_eq!(
201            Location::id(&self.location),
202            expected_location,
203            "locations do not match"
204        );
205        self.location
206            .flow_state()
207            .borrow_mut()
208            .push_root(HydroRoot::CycleSink {
209                cycle_id,
210                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
211                op_metadata: HydroIrOpMetadata::new(),
212            });
213    }
214}
215
216impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
217where
218    L: Location<'a>,
219{
220    fn from(singleton: Singleton<T, L, B>) -> Self {
221        Optional::new(
222            singleton.location.clone(),
223            HydroNode::Cast {
224                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
225                metadata: singleton
226                    .location
227                    .new_node_metadata(Self::collection_kind()),
228            },
229        )
230    }
231}
232
233#[cfg(stageleft_runtime)]
234pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
235    me: Optional<T, L, B>,
236    other: Optional<O, L, B>,
237) -> Optional<(T, O), L, B> {
238    check_matching_location(&me.location, &other.location);
239
240    Optional::new(
241        me.location.clone(),
242        HydroNode::CrossSingleton {
243            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
244            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
245            metadata: me
246                .location
247                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
248        },
249    )
250}
251
252#[cfg(stageleft_runtime)]
253fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
254    me: Optional<T, L, B>,
255    other: Optional<T, L, B>,
256) -> Optional<T, L, B> {
257    check_matching_location(&me.location, &other.location);
258
259    Optional::new(
260        me.location.clone(),
261        HydroNode::ChainFirst {
262            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
263            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
264            metadata: me
265                .location
266                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
267        },
268    )
269}
270
271impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
272where
273    T: Clone,
274    L: Location<'a>,
275{
276    fn clone(&self) -> Self {
277        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
278            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
279            *self.ir_node.borrow_mut() = HydroNode::Tee {
280                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
281                metadata: self.location.new_node_metadata(Self::collection_kind()),
282            };
283        }
284
285        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
286            Optional {
287                location: self.location.clone(),
288                flow_state: self.flow_state.clone(),
289                ir_node: HydroNode::Tee {
290                    inner: SharedNode(inner.0.clone()),
291                    metadata: metadata.clone(),
292                }
293                .into(),
294                _phantom: PhantomData,
295            }
296        } else {
297            unreachable!()
298        }
299    }
300}
301
302impl<'a, T, L, B: Boundedness> Optional<T, L, B>
303where
304    L: Location<'a>,
305{
306    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
307        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
308        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
309        let flow_state = location.flow_state().clone();
310        Optional {
311            location,
312            flow_state,
313            ir_node: RefCell::new(ir_node),
314            _phantom: PhantomData,
315        }
316    }
317
318    pub(crate) fn collection_kind() -> CollectionKind {
319        CollectionKind::Optional {
320            bound: B::BOUND_KIND,
321            element_type: stageleft::quote_type::<T>().into(),
322        }
323    }
324
325    /// Returns the [`Location`] where this optional is being materialized.
326    pub fn location(&self) -> &L {
327        &self.location
328    }
329
330    /// Transforms the optional value by applying a function `f` to it,
331    /// continuously as the input is updated.
332    ///
333    /// Whenever the optional is empty, the output optional is also empty.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// let tick = process.tick();
342    /// let optional = tick.optional_first_tick(q!(1));
343    /// optional.map(q!(|v| v + 1)).all_ticks()
344    /// # }, |mut stream| async move {
345    /// // 2
346    /// # assert_eq!(stream.next().await.unwrap(), 2);
347    /// # }));
348    /// # }
349    /// ```
350    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
351    where
352        F: Fn(T) -> U + 'a,
353    {
354        let f = f.splice_fn1_ctx(&self.location).into();
355        Optional::new(
356            self.location.clone(),
357            HydroNode::Map {
358                f,
359                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
360                metadata: self
361                    .location
362                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
363            },
364        )
365    }
366
367    /// Transforms the optional value by applying a function `f` to it and then flattening
368    /// the result into a stream, preserving the order of elements.
369    ///
370    /// If the optional is empty, the output stream is also empty. If the optional contains
371    /// a value, `f` is applied to produce an iterator, and all items from that iterator
372    /// are emitted in the output stream in deterministic order.
373    ///
374    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
375    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
376    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
377    ///
378    /// # Example
379    /// ```rust
380    /// # #[cfg(feature = "deploy")] {
381    /// # use hydro_lang::prelude::*;
382    /// # use futures::StreamExt;
383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
384    /// let tick = process.tick();
385    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
386    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
387    /// # }, |mut stream| async move {
388    /// // 1, 2, 3
389    /// # for w in vec![1, 2, 3] {
390    /// #     assert_eq!(stream.next().await.unwrap(), w);
391    /// # }
392    /// # }));
393    /// # }
394    /// ```
395    pub fn flat_map_ordered<U, I, F>(
396        self,
397        f: impl IntoQuotedMut<'a, F, L>,
398    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
399    where
400        B: IsBounded,
401        I: IntoIterator<Item = U>,
402        F: Fn(T) -> I + 'a,
403    {
404        self.into_stream().flat_map_ordered(f)
405    }
406
407    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
408    /// for the output type `I` to produce items in any order.
409    ///
410    /// If the optional is empty, the output stream is also empty. If the optional contains
411    /// a value, `f` is applied to produce an iterator, and all items from that iterator
412    /// are emitted in the output stream in non-deterministic order.
413    ///
414    /// # Example
415    /// ```rust
416    /// # #[cfg(feature = "deploy")] {
417    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
418    /// # use futures::StreamExt;
419    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
420    /// let tick = process.tick();
421    /// let optional = tick.optional_first_tick(q!(
422    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
423    /// ));
424    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
425    /// # }, |mut stream| async move {
426    /// // 1, 2, 3, but in no particular order
427    /// # let mut results = Vec::new();
428    /// # for _ in 0..3 {
429    /// #     results.push(stream.next().await.unwrap());
430    /// # }
431    /// # results.sort();
432    /// # assert_eq!(results, vec![1, 2, 3]);
433    /// # }));
434    /// # }
435    /// ```
436    pub fn flat_map_unordered<U, I, F>(
437        self,
438        f: impl IntoQuotedMut<'a, F, L>,
439    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
440    where
441        B: IsBounded,
442        I: IntoIterator<Item = U>,
443        F: Fn(T) -> I + 'a,
444    {
445        self.into_stream().flat_map_unordered(f)
446    }
447
448    /// Flattens the optional value into a stream, preserving the order of elements.
449    ///
450    /// If the optional is empty, the output stream is also empty. If the optional contains
451    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
452    /// in the output stream in deterministic order.
453    ///
454    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
455    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
456    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
457    ///
458    /// # Example
459    /// ```rust
460    /// # #[cfg(feature = "deploy")] {
461    /// # use hydro_lang::prelude::*;
462    /// # use futures::StreamExt;
463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464    /// let tick = process.tick();
465    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
466    /// optional.flatten_ordered().all_ticks()
467    /// # }, |mut stream| async move {
468    /// // 1, 2, 3
469    /// # for w in vec![1, 2, 3] {
470    /// #     assert_eq!(stream.next().await.unwrap(), w);
471    /// # }
472    /// # }));
473    /// # }
474    /// ```
475    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
476    where
477        B: IsBounded,
478        T: IntoIterator<Item = U>,
479    {
480        self.flat_map_ordered(q!(|v| v))
481    }
482
483    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
484    /// for the element type `T` to produce items in any order.
485    ///
486    /// If the optional is empty, the output stream is also empty. If the optional contains
487    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
488    /// in the output stream in non-deterministic order.
489    ///
490    /// # Example
491    /// ```rust
492    /// # #[cfg(feature = "deploy")] {
493    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
494    /// # use futures::StreamExt;
495    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
496    /// let tick = process.tick();
497    /// let optional = tick.optional_first_tick(q!(
498    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
499    /// ));
500    /// optional.flatten_unordered().all_ticks()
501    /// # }, |mut stream| async move {
502    /// // 1, 2, 3, but in no particular order
503    /// # let mut results = Vec::new();
504    /// # for _ in 0..3 {
505    /// #     results.push(stream.next().await.unwrap());
506    /// # }
507    /// # results.sort();
508    /// # assert_eq!(results, vec![1, 2, 3]);
509    /// # }));
510    /// # }
511    /// ```
512    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
513    where
514        B: IsBounded,
515        T: IntoIterator<Item = U>,
516    {
517        self.flat_map_unordered(q!(|v| v))
518    }
519
520    /// Creates an optional containing only the value if it satisfies a predicate `f`.
521    ///
522    /// If the optional is empty, the output optional is also empty. If the optional contains
523    /// a value and the predicate returns `true`, the output optional contains the same value.
524    /// If the predicate returns `false`, the output optional is empty.
525    ///
526    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
527    /// not modify or take ownership of the value. If you need to modify the value while filtering
528    /// use [`Optional::filter_map`] instead.
529    ///
530    /// # Example
531    /// ```rust
532    /// # #[cfg(feature = "deploy")] {
533    /// # use hydro_lang::prelude::*;
534    /// # use futures::StreamExt;
535    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536    /// let tick = process.tick();
537    /// let optional = tick.optional_first_tick(q!(5));
538    /// optional.filter(q!(|&x| x > 3)).all_ticks()
539    /// # }, |mut stream| async move {
540    /// // 5
541    /// # assert_eq!(stream.next().await.unwrap(), 5);
542    /// # }));
543    /// # }
544    /// ```
545    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
546    where
547        F: Fn(&T) -> bool + 'a,
548    {
549        let f = f.splice_fn1_borrow_ctx(&self.location).into();
550        Optional::new(
551            self.location.clone(),
552            HydroNode::Filter {
553                f,
554                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555                metadata: self.location.new_node_metadata(Self::collection_kind()),
556            },
557        )
558    }
559
560    /// An operator that both filters and maps. It yields only the value if the supplied
561    /// closure `f` returns `Some(value)`.
562    ///
563    /// If the optional is empty, the output optional is also empty. If the optional contains
564    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
565    /// If the closure returns `None`, the output optional is empty.
566    ///
567    /// # Example
568    /// ```rust
569    /// # #[cfg(feature = "deploy")] {
570    /// # use hydro_lang::prelude::*;
571    /// # use futures::StreamExt;
572    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573    /// let tick = process.tick();
574    /// let optional = tick.optional_first_tick(q!("42"));
575    /// optional
576    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
577    ///     .all_ticks()
578    /// # }, |mut stream| async move {
579    /// // 42
580    /// # assert_eq!(stream.next().await.unwrap(), 42);
581    /// # }));
582    /// # }
583    /// ```
584    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
585    where
586        F: Fn(T) -> Option<U> + 'a,
587    {
588        let f = f.splice_fn1_ctx(&self.location).into();
589        Optional::new(
590            self.location.clone(),
591            HydroNode::FilterMap {
592                f,
593                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594                metadata: self
595                    .location
596                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
597            },
598        )
599    }
600
601    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
602    ///
603    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
604    /// non-null. This is useful for combining several pieces of state together.
605    ///
606    /// # Example
607    /// ```rust
608    /// # #[cfg(feature = "deploy")] {
609    /// # use hydro_lang::prelude::*;
610    /// # use futures::StreamExt;
611    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612    /// let tick = process.tick();
613    /// let numbers = process
614    ///   .source_iter(q!(vec![123, 456, 789]))
615    ///   .batch(&tick, nondet!(/** test */));
616    /// let min = numbers.clone().min(); // Optional
617    /// let max = numbers.max(); // Optional
618    /// min.zip(max).all_ticks()
619    /// # }, |mut stream| async move {
620    /// // [(123, 789)]
621    /// # for w in vec![(123, 789)] {
622    /// #     assert_eq!(stream.next().await.unwrap(), w);
623    /// # }
624    /// # }));
625    /// # }
626    /// ```
627    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
628    where
629        B: IsBounded,
630    {
631        let other: Optional<O, L, B> = other.into();
632        check_matching_location(&self.location, &other.location);
633
634        if L::is_top_level()
635            && let Some(tick) = self.location.try_tick()
636        {
637            let out = zip_inside_tick(
638                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640            )
641            .latest();
642
643            Optional::new(
644                out.location.clone(),
645                out.ir_node.replace(HydroNode::Placeholder),
646            )
647        } else {
648            zip_inside_tick(self, other)
649        }
650    }
651
652    /// Passes through `self` when it has a value, otherwise passes through `other`.
653    ///
654    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
655    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
656    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
657    ///
658    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
659    /// of the inputs change (including to/from null states).
660    ///
661    /// # Example
662    /// ```rust
663    /// # #[cfg(feature = "deploy")] {
664    /// # use hydro_lang::prelude::*;
665    /// # use futures::StreamExt;
666    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
667    /// let tick = process.tick();
668    /// // ticks are lazy by default, forces the second tick to run
669    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
670    ///
671    /// let some_first_tick = tick.optional_first_tick(q!(123));
672    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
673    /// some_first_tick.or(some_second_tick).all_ticks()
674    /// # }, |mut stream| async move {
675    /// // [123 /* first tick */, 456 /* second tick */]
676    /// # for w in vec![123, 456] {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// # }
681    /// ```
682    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
683        check_matching_location(&self.location, &other.location);
684
685        if L::is_top_level()
686            && !B::BOUNDED // only if unbounded we need to use a tick
687            && let Some(tick) = self.location.try_tick()
688        {
689            let out = or_inside_tick(
690                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
691                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692            )
693            .latest();
694
695            Optional::new(
696                out.location.clone(),
697                out.ir_node.replace(HydroNode::Placeholder),
698            )
699        } else {
700            Optional::new(
701                self.location.clone(),
702                HydroNode::ChainFirst {
703                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
704                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
705                    metadata: self.location.new_node_metadata(Self::collection_kind()),
706                },
707            )
708        }
709    }
710
711    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
712    ///
713    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
714    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
715    ///
716    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
717    /// of the inputs change (including to/from null states).
718    ///
719    /// # Example
720    /// ```rust
721    /// # #[cfg(feature = "deploy")] {
722    /// # use hydro_lang::prelude::*;
723    /// # use futures::StreamExt;
724    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
725    /// let tick = process.tick();
726    /// // ticks are lazy by default, forces the later ticks to run
727    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
728    ///
729    /// let some_first_tick = tick.optional_first_tick(q!(123));
730    /// some_first_tick
731    ///     .unwrap_or(tick.singleton(q!(456)))
732    ///     .all_ticks()
733    /// # }, |mut stream| async move {
734    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
735    /// # for w in vec![123, 456, 456, 456] {
736    /// #     assert_eq!(stream.next().await.unwrap(), w);
737    /// # }
738    /// # }));
739    /// # }
740    /// ```
741    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
742        let res_option = self.or(other.into());
743        Singleton::new(
744            res_option.location.clone(),
745            HydroNode::Cast {
746                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
747                metadata: res_option
748                    .location
749                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
750            },
751        )
752    }
753
754    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
755    ///
756    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
757    /// [`Optional`] when the default value of the type is a suitable fallback.
758    ///
759    /// # Example
760    /// ```rust
761    /// # #[cfg(feature = "deploy")] {
762    /// # use hydro_lang::prelude::*;
763    /// # use futures::StreamExt;
764    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765    /// let tick = process.tick();
766    /// // ticks are lazy by default, forces the later ticks to run
767    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
768    ///
769    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
770    /// some_first_tick.unwrap_or_default().all_ticks()
771    /// # }, |mut stream| async move {
772    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
773    /// # for w in vec![123, 0, 0, 0] {
774    /// #     assert_eq!(stream.next().await.unwrap(), w);
775    /// # }
776    /// # }));
777    /// # }
778    /// ```
779    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
780    where
781        T: Default + Clone,
782    {
783        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
784    }
785
786    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
787    ///
788    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
789    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
790    /// so that Hydro can skip any computation on null values.
791    ///
792    /// # Example
793    /// ```rust
794    /// # #[cfg(feature = "deploy")] {
795    /// # use hydro_lang::prelude::*;
796    /// # use futures::StreamExt;
797    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
798    /// let tick = process.tick();
799    /// // ticks are lazy by default, forces the later ticks to run
800    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
801    ///
802    /// let some_first_tick = tick.optional_first_tick(q!(123));
803    /// some_first_tick.into_singleton().all_ticks()
804    /// # }, |mut stream| async move {
805    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
806    /// # for w in vec![Some(123), None, None, None] {
807    /// #     assert_eq!(stream.next().await.unwrap(), w);
808    /// # }
809    /// # }));
810    /// # }
811    /// ```
812    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
813    where
814        T: Clone,
815    {
816        let none: syn::Expr = parse_quote!(::std::option::Option::None);
817
818        let none_singleton = Singleton::new(
819            self.location.clone(),
820            HydroNode::SingletonSource {
821                value: none.into(),
822                first_tick_only: false,
823                metadata: self
824                    .location
825                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
826            },
827        );
828
829        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
830    }
831
832    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
833    ///
834    /// # Example
835    /// ```rust
836    /// # #[cfg(feature = "deploy")] {
837    /// # use hydro_lang::prelude::*;
838    /// # use futures::StreamExt;
839    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
840    /// let tick = process.tick();
841    /// // ticks are lazy by default, forces the second tick to run
842    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
843    ///
844    /// let some_first_tick = tick.optional_first_tick(q!(42));
845    /// some_first_tick.is_some().all_ticks()
846    /// # }, |mut stream| async move {
847    /// // [true /* first tick */, false /* second tick */, ...]
848    /// # for w in vec![true, false] {
849    /// #     assert_eq!(stream.next().await.unwrap(), w);
850    /// # }
851    /// # }));
852    /// # }
853    /// ```
854    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
855    pub fn is_some(self) -> Singleton<bool, L, B> {
856        self.map(q!(|_| ()))
857            .into_singleton()
858            .map(q!(|o| o.is_some()))
859    }
860
861    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
862    ///
863    /// # Example
864    /// ```rust
865    /// # #[cfg(feature = "deploy")] {
866    /// # use hydro_lang::prelude::*;
867    /// # use futures::StreamExt;
868    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
869    /// let tick = process.tick();
870    /// // ticks are lazy by default, forces the second tick to run
871    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
872    ///
873    /// let some_first_tick = tick.optional_first_tick(q!(42));
874    /// some_first_tick.is_none().all_ticks()
875    /// # }, |mut stream| async move {
876    /// // [false /* first tick */, true /* second tick */, ...]
877    /// # for w in vec![false, true] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// # }
882    /// ```
883    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
884    pub fn is_none(self) -> Singleton<bool, L, B> {
885        self.map(q!(|_| ()))
886            .into_singleton()
887            .map(q!(|o| o.is_none()))
888    }
889
890    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
891    /// values are equal, `false` otherwise (including when either is null).
892    ///
893    /// # Example
894    /// ```rust
895    /// # #[cfg(feature = "deploy")] {
896    /// # use hydro_lang::prelude::*;
897    /// # use futures::StreamExt;
898    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899    /// let tick = process.tick();
900    /// // ticks are lazy by default, forces the second tick to run
901    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902    ///
903    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
904    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
905    /// a.is_some_and_equals(b).all_ticks()
906    /// # }, |mut stream| async move {
907    /// // [true, false]
908    /// # for w in vec![true, false] {
909    /// #     assert_eq!(stream.next().await.unwrap(), w);
910    /// # }
911    /// # }));
912    /// # }
913    /// ```
914    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
915    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
916    where
917        T: PartialEq + Clone,
918        B: IsBounded,
919    {
920        self.into_singleton()
921            .zip(other.into_singleton())
922            .map(q!(|(a, b)| a.is_some() && a == b))
923    }
924
925    /// An operator which allows you to "name" a `HydroNode`.
926    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
927    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
928        {
929            let mut node = self.ir_node.borrow_mut();
930            let metadata = node.metadata_mut();
931            metadata.tag = Some(name.to_owned());
932        }
933        self
934    }
935
936    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
937    /// implies that `B == Bounded`.
938    pub fn make_bounded(self) -> Optional<T, L, Bounded>
939    where
940        B: IsBounded,
941    {
942        Optional::new(
943            self.location.clone(),
944            self.ir_node.replace(HydroNode::Placeholder),
945        )
946    }
947
948    /// Clones this bounded optional into a tick, returning a optional that has the
949    /// same value as the outer optional. Because the outer optional is bounded, this
950    /// is deterministic because there is only a single immutable version.
951    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
952    where
953        B: IsBounded,
954        T: Clone,
955    {
956        // TODO(shadaj): avoid printing simulator logs for this snapshot
957        self.snapshot(
958            tick,
959            nondet!(/** bounded top-level optional so deterministic */),
960        )
961    }
962
963    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
964    /// non-null. Otherwise, the stream is empty.
965    ///
966    /// # Example
967    /// ```rust
968    /// # #[cfg(feature = "deploy")] {
969    /// # use hydro_lang::prelude::*;
970    /// # use futures::StreamExt;
971    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
972    /// # let tick = process.tick();
973    /// # // ticks are lazy by default, forces the second tick to run
974    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
975    /// # let batch_first_tick = process
976    /// #   .source_iter(q!(vec![]))
977    /// #   .batch(&tick, nondet!(/** test */));
978    /// # let batch_second_tick = process
979    /// #   .source_iter(q!(vec![123, 456]))
980    /// #   .batch(&tick, nondet!(/** test */))
981    /// #   .defer_tick(); // appears on the second tick
982    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
983    /// input_batch // first tick: [], second tick: [123, 456]
984    ///     .clone()
985    ///     .max()
986    ///     .into_stream()
987    ///     .chain(input_batch)
988    ///     .all_ticks()
989    /// # }, |mut stream| async move {
990    /// // [456, 123, 456]
991    /// # for w in vec![456, 123, 456] {
992    /// #     assert_eq!(stream.next().await.unwrap(), w);
993    /// # }
994    /// # }));
995    /// # }
996    /// ```
997    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
998    where
999        B: IsBounded,
1000    {
1001        Stream::new(
1002            self.location.clone(),
1003            HydroNode::Cast {
1004                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1005                metadata: self.location.new_node_metadata(Stream::<
1006                    T,
1007                    Tick<L>,
1008                    Bounded,
1009                    TotalOrder,
1010                    ExactlyOnce,
1011                >::collection_kind()),
1012            },
1013        )
1014    }
1015
1016    /// Filters this optional, passing through the value if the boolean signal is `true`,
1017    /// otherwise the output is null.
1018    ///
1019    /// # Example
1020    /// ```rust
1021    /// # #[cfg(feature = "deploy")] {
1022    /// # use hydro_lang::prelude::*;
1023    /// # use futures::StreamExt;
1024    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1025    /// let tick = process.tick();
1026    /// // ticks are lazy by default, forces the second tick to run
1027    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1028    ///
1029    /// let some_first_tick = tick.optional_first_tick(q!(()));
1030    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1031    /// let batch_first_tick = process
1032    ///   .source_iter(q!(vec![456]))
1033    ///   .batch(&tick, nondet!(/** test */));
1034    /// let batch_second_tick = process
1035    ///   .source_iter(q!(vec![789]))
1036    ///   .batch(&tick, nondet!(/** test */))
1037    ///   .defer_tick();
1038    /// batch_first_tick.chain(batch_second_tick).first()
1039    ///   .filter_if(signal)
1040    ///   .unwrap_or(tick.singleton(q!(0)))
1041    ///   .all_ticks()
1042    /// # }, |mut stream| async move {
1043    /// // [456, 0]
1044    /// # for w in vec![456, 0] {
1045    /// #     assert_eq!(stream.next().await.unwrap(), w);
1046    /// # }
1047    /// # }));
1048    /// # }
1049    /// ```
1050    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1051    where
1052        B: IsBounded,
1053    {
1054        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1055    }
1056
1057    /// Filters this optional, passing through the optional value if it is non-null **and** the
1058    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1059    ///
1060    /// Useful for conditionally processing, such as only emitting an optional's value outside
1061    /// a tick if some other condition is satisfied.
1062    ///
1063    /// # Example
1064    /// ```rust
1065    /// # #[cfg(feature = "deploy")] {
1066    /// # use hydro_lang::prelude::*;
1067    /// # use futures::StreamExt;
1068    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1069    /// let tick = process.tick();
1070    /// // ticks are lazy by default, forces the second tick to run
1071    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1072    ///
1073    /// let batch_first_tick = process
1074    ///   .source_iter(q!(vec![]))
1075    ///   .batch(&tick, nondet!(/** test */));
1076    /// let batch_second_tick = process
1077    ///   .source_iter(q!(vec![456]))
1078    ///   .batch(&tick, nondet!(/** test */))
1079    ///   .defer_tick(); // appears on the second tick
1080    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1081    /// batch_first_tick.chain(batch_second_tick).first()
1082    ///   .filter_if_some(some_on_first_tick)
1083    ///   .unwrap_or(tick.singleton(q!(789)))
1084    ///   .all_ticks()
1085    /// # }, |mut stream| async move {
1086    /// // [789, 789]
1087    /// # for w in vec![789, 789] {
1088    /// #     assert_eq!(stream.next().await.unwrap(), w);
1089    /// # }
1090    /// # }));
1091    /// # }
1092    /// ```
1093    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1094    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1095    where
1096        B: IsBounded,
1097    {
1098        self.filter_if(signal.is_some())
1099    }
1100
1101    /// Filters this optional, passing through the optional value if it is non-null **and** the
1102    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1103    ///
1104    /// Useful for conditionally processing, such as only emitting an optional's value outside
1105    /// a tick if some other condition is satisfied.
1106    ///
1107    /// # Example
1108    /// ```rust
1109    /// # #[cfg(feature = "deploy")] {
1110    /// # use hydro_lang::prelude::*;
1111    /// # use futures::StreamExt;
1112    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1113    /// let tick = process.tick();
1114    /// // ticks are lazy by default, forces the second tick to run
1115    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1116    ///
1117    /// let batch_first_tick = process
1118    ///   .source_iter(q!(vec![]))
1119    ///   .batch(&tick, nondet!(/** test */));
1120    /// let batch_second_tick = process
1121    ///   .source_iter(q!(vec![456]))
1122    ///   .batch(&tick, nondet!(/** test */))
1123    ///   .defer_tick(); // appears on the second tick
1124    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1125    /// batch_first_tick.chain(batch_second_tick).first()
1126    ///   .filter_if_none(some_on_first_tick)
1127    ///   .unwrap_or(tick.singleton(q!(789)))
1128    ///   .all_ticks()
1129    /// # }, |mut stream| async move {
1130    /// // [789, 789]
1131    /// # for w in vec![789, 456] {
1132    /// #     assert_eq!(stream.next().await.unwrap(), w);
1133    /// # }
1134    /// # }));
1135    /// # }
1136    /// ```
1137    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1138    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1139    where
1140        B: IsBounded,
1141    {
1142        self.filter_if(other.is_none())
1143    }
1144
1145    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1146    ///
1147    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1148    /// having a value, such as only releasing a piece of state if the node is the leader.
1149    ///
1150    /// # Example
1151    /// ```rust
1152    /// # #[cfg(feature = "deploy")] {
1153    /// # use hydro_lang::prelude::*;
1154    /// # use futures::StreamExt;
1155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156    /// let tick = process.tick();
1157    /// // ticks are lazy by default, forces the second tick to run
1158    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1159    ///
1160    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1161    /// some_on_first_tick
1162    ///     .if_some_then(tick.singleton(q!(456)))
1163    ///     .unwrap_or(tick.singleton(q!(123)))
1164    /// # .all_ticks()
1165    /// # }, |mut stream| async move {
1166    /// // 456 (first tick) ~> 123 (second tick onwards)
1167    /// # for w in vec![456, 123, 123] {
1168    /// #     assert_eq!(stream.next().await.unwrap(), w);
1169    /// # }
1170    /// # }));
1171    /// # }
1172    /// ```
1173    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1174    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1175    where
1176        B: IsBounded,
1177    {
1178        value.filter_if(self.is_some())
1179    }
1180}
1181
1182impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1183where
1184    L: Location<'a> + NoTick,
1185{
1186    /// Returns an optional value corresponding to the latest snapshot of the optional
1187    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1188    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1189    /// all snapshots of this optional into the atomic-associated tick will observe the
1190    /// same value each tick.
1191    ///
1192    /// # Non-Determinism
1193    /// Because this picks a snapshot of a optional whose value is continuously changing,
1194    /// the output optional has a non-deterministic value since the snapshot can be at an
1195    /// arbitrary point in time.
1196    pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1197        Optional::new(
1198            tick.clone(),
1199            HydroNode::Batch {
1200                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1201                metadata: tick
1202                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1203            },
1204        )
1205    }
1206
1207    /// Returns this optional back into a top-level, asynchronous execution context where updates
1208    /// to the value will be asynchronously propagated.
1209    pub fn end_atomic(self) -> Optional<T, L, B> {
1210        Optional::new(
1211            self.location.tick.l.clone(),
1212            HydroNode::EndAtomic {
1213                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214                metadata: self
1215                    .location
1216                    .tick
1217                    .l
1218                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1219            },
1220        )
1221    }
1222}
1223
1224impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1225where
1226    L: Location<'a>,
1227{
1228    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1229    /// will observe the same version of the value and will be executed synchronously before any
1230    /// outputs are yielded (in [`Optional::end_atomic`]).
1231    ///
1232    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1233    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1234    /// a different version).
1235    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1236        let id = self.location.flow_state().borrow_mut().next_clock_id();
1237        let out_location = Atomic {
1238            tick: Tick {
1239                id,
1240                l: self.location.clone(),
1241            },
1242        };
1243        Optional::new(
1244            out_location.clone(),
1245            HydroNode::BeginAtomic {
1246                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247                metadata: out_location
1248                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1249            },
1250        )
1251    }
1252
1253    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1254    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1255    /// relevant data that contributed to the snapshot at tick `t`.
1256    ///
1257    /// # Non-Determinism
1258    /// Because this picks a snapshot of a optional whose value is continuously changing,
1259    /// the output optional has a non-deterministic value since the snapshot can be at an
1260    /// arbitrary point in time.
1261    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1262        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1263        Optional::new(
1264            tick.clone(),
1265            HydroNode::Batch {
1266                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1267                metadata: tick
1268                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1269            },
1270        )
1271    }
1272
1273    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1274    /// with order corresponding to increasing prefixes of data contributing to the optional.
1275    ///
1276    /// # Non-Determinism
1277    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1278    /// to non-deterministic batching and arrival of inputs, the output stream is
1279    /// non-deterministic.
1280    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1281    where
1282        L: NoTick,
1283    {
1284        let tick = self.location.tick();
1285        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1286    }
1287
1288    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1289    /// value taken at various points in time. Because the input optional may be
1290    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1291    /// represent the value of the optional given some prefix of the streams leading up to
1292    /// it.
1293    ///
1294    /// # Non-Determinism
1295    /// The output stream is non-deterministic in which elements are sampled, since this
1296    /// is controlled by a clock.
1297    pub fn sample_every(
1298        self,
1299        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1300        nondet: NonDet,
1301    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1302    where
1303        L: NoTick + NoAtomic,
1304    {
1305        let samples = self.location.source_interval(interval, nondet);
1306        let tick = self.location.tick();
1307
1308        self.snapshot(&tick, nondet)
1309            .filter_if(samples.batch(&tick, nondet).first().is_some())
1310            .all_ticks()
1311            .weaken_retries()
1312    }
1313}
1314
1315impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1316where
1317    L: Location<'a>,
1318{
1319    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1320    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1321    /// null values).
1322    ///
1323    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1324    /// producing one element in the output for each (non-null) tick. This is useful for batched
1325    /// computations, where the results from each tick must be combined together.
1326    ///
1327    /// # Example
1328    /// ```rust
1329    /// # #[cfg(feature = "deploy")] {
1330    /// # use hydro_lang::prelude::*;
1331    /// # use futures::StreamExt;
1332    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1333    /// # let tick = process.tick();
1334    /// # // ticks are lazy by default, forces the second tick to run
1335    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1336    /// # let batch_first_tick = process
1337    /// #   .source_iter(q!(vec![]))
1338    /// #   .batch(&tick, nondet!(/** test */));
1339    /// # let batch_second_tick = process
1340    /// #   .source_iter(q!(vec![1, 2, 3]))
1341    /// #   .batch(&tick, nondet!(/** test */))
1342    /// #   .defer_tick(); // appears on the second tick
1343    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1344    /// input_batch // first tick: [], second tick: [1, 2, 3]
1345    ///     .max()
1346    ///     .all_ticks()
1347    /// # }, |mut stream| async move {
1348    /// // [3]
1349    /// # for w in vec![3] {
1350    /// #     assert_eq!(stream.next().await.unwrap(), w);
1351    /// # }
1352    /// # }));
1353    /// # }
1354    /// ```
1355    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1356        self.into_stream().all_ticks()
1357    }
1358
1359    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1360    /// which will stream the value computed in _each_ tick as a separate stream element.
1361    ///
1362    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1363    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1364    /// optional's [`Tick`] context.
1365    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1366        self.into_stream().all_ticks_atomic()
1367    }
1368
1369    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1370    /// be asynchronously updated with the latest value of the optional inside the tick, including
1371    /// whether the optional is null or not.
1372    ///
1373    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1374    /// tick that tracks the inner value. This is useful for getting the value as of the
1375    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1376    ///
1377    /// # Example
1378    /// ```rust
1379    /// # #[cfg(feature = "deploy")] {
1380    /// # use hydro_lang::prelude::*;
1381    /// # use futures::StreamExt;
1382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1383    /// # let tick = process.tick();
1384    /// # // ticks are lazy by default, forces the second tick to run
1385    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1386    /// # let batch_first_tick = process
1387    /// #   .source_iter(q!(vec![]))
1388    /// #   .batch(&tick, nondet!(/** test */));
1389    /// # let batch_second_tick = process
1390    /// #   .source_iter(q!(vec![1, 2, 3]))
1391    /// #   .batch(&tick, nondet!(/** test */))
1392    /// #   .defer_tick(); // appears on the second tick
1393    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1394    /// input_batch // first tick: [], second tick: [1, 2, 3]
1395    ///     .max()
1396    ///     .latest()
1397    /// # .into_singleton()
1398    /// # .sample_eager(nondet!(/** test */))
1399    /// # }, |mut stream| async move {
1400    /// // asynchronously changes from None ~> 3
1401    /// # for w in vec![None, Some(3)] {
1402    /// #     assert_eq!(stream.next().await.unwrap(), w);
1403    /// # }
1404    /// # }));
1405    /// # }
1406    /// ```
1407    pub fn latest(self) -> Optional<T, L, Unbounded> {
1408        Optional::new(
1409            self.location.outer().clone(),
1410            HydroNode::YieldConcat {
1411                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1412                metadata: self
1413                    .location
1414                    .outer()
1415                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1416            },
1417        )
1418    }
1419
1420    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1421    /// be updated with the latest value of the optional inside the tick.
1422    ///
1423    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1424    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1425    /// optional's [`Tick`] context.
1426    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1427        let out_location = Atomic {
1428            tick: self.location.clone(),
1429        };
1430
1431        Optional::new(
1432            out_location.clone(),
1433            HydroNode::YieldConcat {
1434                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435                metadata: out_location
1436                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1437            },
1438        )
1439    }
1440
1441    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1442    /// always has the state of `self` at tick `T - 1`.
1443    ///
1444    /// At tick `0`, the output optional is null, since there is no previous tick.
1445    ///
1446    /// This operator enables stateful iterative processing with ticks, by sending data from one
1447    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1448    ///
1449    /// # Example
1450    /// ```rust
1451    /// # #[cfg(feature = "deploy")] {
1452    /// # use hydro_lang::prelude::*;
1453    /// # use futures::StreamExt;
1454    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1455    /// let tick = process.tick();
1456    /// // ticks are lazy by default, forces the second tick to run
1457    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1458    ///
1459    /// let batch_first_tick = process
1460    ///   .source_iter(q!(vec![1, 2]))
1461    ///   .batch(&tick, nondet!(/** test */));
1462    /// let batch_second_tick = process
1463    ///   .source_iter(q!(vec![3, 4]))
1464    ///   .batch(&tick, nondet!(/** test */))
1465    ///   .defer_tick(); // appears on the second tick
1466    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1467    ///   .reduce(q!(|state, v| *state += v));
1468    ///
1469    /// current_tick_sum.clone().into_singleton().zip(
1470    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1471    /// ).all_ticks()
1472    /// # }, |mut stream| async move {
1473    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1474    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1475    /// #     assert_eq!(stream.next().await.unwrap(), w);
1476    /// # }
1477    /// # }));
1478    /// # }
1479    /// ```
1480    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1481        Optional::new(
1482            self.location.clone(),
1483            HydroNode::DeferTick {
1484                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1485                metadata: self.location.new_node_metadata(Self::collection_kind()),
1486            },
1487        )
1488    }
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493    #[cfg(feature = "deploy")]
1494    use futures::StreamExt;
1495    #[cfg(feature = "deploy")]
1496    use hydro_deploy::Deployment;
1497    #[cfg(any(feature = "deploy", feature = "sim"))]
1498    use stageleft::q;
1499
1500    #[cfg(feature = "deploy")]
1501    use super::Optional;
1502    #[cfg(any(feature = "deploy", feature = "sim"))]
1503    use crate::compile::builder::FlowBuilder;
1504    #[cfg(any(feature = "deploy", feature = "sim"))]
1505    use crate::location::Location;
1506    #[cfg(feature = "deploy")]
1507    use crate::nondet::nondet;
1508
1509    #[cfg(feature = "deploy")]
1510    #[tokio::test]
1511    async fn optional_or_cardinality() {
1512        let mut deployment = Deployment::new();
1513
1514        let mut flow = FlowBuilder::new();
1515        let node = flow.process::<()>();
1516        let external = flow.external::<()>();
1517
1518        let node_tick = node.tick();
1519        let tick_singleton = node_tick.singleton(q!(123));
1520        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1521        let counts = tick_optional_inhabited
1522            .clone()
1523            .or(tick_optional_inhabited)
1524            .into_stream()
1525            .count()
1526            .all_ticks()
1527            .send_bincode_external(&external);
1528
1529        let nodes = flow
1530            .with_process(&node, deployment.Localhost())
1531            .with_external(&external, deployment.Localhost())
1532            .deploy(&mut deployment);
1533
1534        deployment.deploy().await.unwrap();
1535
1536        let mut external_out = nodes.connect(counts).await;
1537
1538        deployment.start().await.unwrap();
1539
1540        assert_eq!(external_out.next().await.unwrap(), 1);
1541    }
1542
1543    #[cfg(feature = "deploy")]
1544    #[tokio::test]
1545    async fn into_singleton_top_level_none_cardinality() {
1546        let mut deployment = Deployment::new();
1547
1548        let mut flow = FlowBuilder::new();
1549        let node = flow.process::<()>();
1550        let external = flow.external::<()>();
1551
1552        let node_tick = node.tick();
1553        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1554        let into_singleton = top_level_none.into_singleton();
1555
1556        let tick_driver = node.spin();
1557
1558        let counts = into_singleton
1559            .snapshot(&node_tick, nondet!(/** test */))
1560            .into_stream()
1561            .count()
1562            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1563            .map(q!(|(c, _)| c))
1564            .all_ticks()
1565            .send_bincode_external(&external);
1566
1567        let nodes = flow
1568            .with_process(&node, deployment.Localhost())
1569            .with_external(&external, deployment.Localhost())
1570            .deploy(&mut deployment);
1571
1572        deployment.deploy().await.unwrap();
1573
1574        let mut external_out = nodes.connect(counts).await;
1575
1576        deployment.start().await.unwrap();
1577
1578        assert_eq!(external_out.next().await.unwrap(), 1);
1579        assert_eq!(external_out.next().await.unwrap(), 1);
1580        assert_eq!(external_out.next().await.unwrap(), 1);
1581    }
1582
1583    #[cfg(feature = "deploy")]
1584    #[tokio::test]
1585    async fn into_singleton_unbounded_top_level_none_cardinality() {
1586        let mut deployment = Deployment::new();
1587
1588        let mut flow = FlowBuilder::new();
1589        let node = flow.process::<()>();
1590        let external = flow.external::<()>();
1591
1592        let node_tick = node.tick();
1593        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1594        let into_singleton = top_level_none.into_singleton();
1595
1596        let tick_driver = node.spin();
1597
1598        let counts = into_singleton
1599            .snapshot(&node_tick, nondet!(/** test */))
1600            .into_stream()
1601            .count()
1602            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1603            .map(q!(|(c, _)| c))
1604            .all_ticks()
1605            .send_bincode_external(&external);
1606
1607        let nodes = flow
1608            .with_process(&node, deployment.Localhost())
1609            .with_external(&external, deployment.Localhost())
1610            .deploy(&mut deployment);
1611
1612        deployment.deploy().await.unwrap();
1613
1614        let mut external_out = nodes.connect(counts).await;
1615
1616        deployment.start().await.unwrap();
1617
1618        assert_eq!(external_out.next().await.unwrap(), 1);
1619        assert_eq!(external_out.next().await.unwrap(), 1);
1620        assert_eq!(external_out.next().await.unwrap(), 1);
1621    }
1622
1623    #[cfg(feature = "sim")]
1624    #[test]
1625    fn top_level_optional_some_into_stream_no_replay() {
1626        let mut flow = FlowBuilder::new();
1627        let node = flow.process::<()>();
1628
1629        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1630        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1631        let filtered_some = folded.filter(q!(|_| true));
1632
1633        let out_recv = filtered_some.into_stream().sim_output();
1634
1635        flow.sim().exhaustive(async || {
1636            out_recv.assert_yields_only([10]).await;
1637        });
1638    }
1639
1640    #[cfg(feature = "sim")]
1641    #[test]
1642    fn top_level_optional_none_into_stream_no_replay() {
1643        let mut flow = FlowBuilder::new();
1644        let node = flow.process::<()>();
1645
1646        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1647        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1648        let filtered_none = folded.filter(q!(|_| false));
1649
1650        let out_recv = filtered_none.into_stream().sim_output();
1651
1652        flow.sim().exhaustive(async || {
1653            out_recv.assert_yields_only([] as [i32; 0]).await;
1654        });
1655    }
1656}