Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41    pub(crate) flow_state: FlowState,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
47    fn drop(&mut self) {
48        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
49        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
50            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
51                input: Box::new(ir_node),
52                op_metadata: HydroIrOpMetadata::new(),
53            });
54        }
55    }
56}
57
58impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
59where
60    T: Clone,
61    L: Location<'a> + NoTick,
62{
63    fn from(value: Optional<T, L, Bounded>) -> Self {
64        let tick = value.location().tick();
65        value.clone_into_tick(&tick).latest()
66    }
67}
68
69impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
70where
71    L: Location<'a>,
72{
73    fn defer_tick(self) -> Self {
74        Optional::defer_tick(self)
75    }
76}
77
78impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
79where
80    L: Location<'a>,
81{
82    type Location = Tick<L>;
83
84    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
85        Optional::new(
86            location.clone(),
87            HydroNode::CycleSource {
88                cycle_id,
89                metadata: location.new_node_metadata(Self::collection_kind()),
90            },
91        )
92    }
93}
94
95impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
96where
97    L: Location<'a>,
98{
99    type Location = Tick<L>;
100
101    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
102        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
103            location.clone(),
104            HydroNode::DeferTick {
105                input: Box::new(HydroNode::CycleSource {
106                    cycle_id,
107                    metadata: location.new_node_metadata(Self::collection_kind()),
108                }),
109                metadata: location
110                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
111            },
112        );
113
114        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
115    }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123        assert_eq!(
124            Location::id(&self.location),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .push_root(HydroRoot::CycleSink {
132                cycle_id,
133                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134                op_metadata: HydroIrOpMetadata::new(),
135            });
136    }
137}
138
139impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
140where
141    L: Location<'a>,
142{
143    type Location = Tick<L>;
144
145    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
146        Optional::new(
147            location.clone(),
148            HydroNode::CycleSource {
149                cycle_id,
150                metadata: location.new_node_metadata(Self::collection_kind()),
151            },
152        )
153    }
154}
155
156impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
157where
158    L: Location<'a>,
159{
160    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161        assert_eq!(
162            Location::id(&self.location),
163            expected_location,
164            "locations do not match"
165        );
166        self.location
167            .flow_state()
168            .borrow_mut()
169            .push_root(HydroRoot::CycleSink {
170                cycle_id,
171                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174    }
175}
176
177impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
178where
179    L: Location<'a> + NoTick,
180{
181    type Location = L;
182
183    fn create_source(cycle_id: CycleId, location: L) -> Self {
184        Optional::new(
185            location.clone(),
186            HydroNode::CycleSource {
187                cycle_id,
188                metadata: location.new_node_metadata(Self::collection_kind()),
189            },
190        )
191    }
192}
193
194impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
195where
196    L: Location<'a> + NoTick,
197{
198    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
199        assert_eq!(
200            Location::id(&self.location),
201            expected_location,
202            "locations do not match"
203        );
204        self.location
205            .flow_state()
206            .borrow_mut()
207            .push_root(HydroRoot::CycleSink {
208                cycle_id,
209                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
210                op_metadata: HydroIrOpMetadata::new(),
211            });
212    }
213}
214
215impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
216where
217    L: Location<'a>,
218{
219    fn from(singleton: Singleton<T, L, B>) -> Self {
220        Optional::new(
221            singleton.location.clone(),
222            HydroNode::Cast {
223                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
224                metadata: singleton
225                    .location
226                    .new_node_metadata(Self::collection_kind()),
227            },
228        )
229    }
230}
231
232#[cfg(stageleft_runtime)]
233pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
234    me: Optional<T, L, B>,
235    other: Optional<O, L, B>,
236) -> Optional<(T, O), L, B> {
237    check_matching_location(&me.location, &other.location);
238
239    Optional::new(
240        me.location.clone(),
241        HydroNode::CrossSingleton {
242            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
243            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
244            metadata: me
245                .location
246                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
247        },
248    )
249}
250
251#[cfg(stageleft_runtime)]
252fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
253    me: Optional<T, L, B>,
254    other: Optional<T, L, B>,
255) -> Optional<T, L, B> {
256    check_matching_location(&me.location, &other.location);
257
258    Optional::new(
259        me.location.clone(),
260        HydroNode::ChainFirst {
261            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
262            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
263            metadata: me
264                .location
265                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
266        },
267    )
268}
269
270impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
271where
272    T: Clone,
273    L: Location<'a>,
274{
275    fn clone(&self) -> Self {
276        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
277            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
278            *self.ir_node.borrow_mut() = HydroNode::Tee {
279                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
280                metadata: self.location.new_node_metadata(Self::collection_kind()),
281            };
282        }
283
284        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
285            Optional {
286                location: self.location.clone(),
287                flow_state: self.flow_state.clone(),
288                ir_node: HydroNode::Tee {
289                    inner: SharedNode(inner.0.clone()),
290                    metadata: metadata.clone(),
291                }
292                .into(),
293                _phantom: PhantomData,
294            }
295        } else {
296            unreachable!()
297        }
298    }
299}
300
301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
302where
303    L: Location<'a>,
304{
305    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308        let flow_state = location.flow_state().clone();
309        Optional {
310            location,
311            flow_state,
312            ir_node: RefCell::new(ir_node),
313            _phantom: PhantomData,
314        }
315    }
316
317    pub(crate) fn collection_kind() -> CollectionKind {
318        CollectionKind::Optional {
319            bound: B::BOUND_KIND,
320            element_type: stageleft::quote_type::<T>().into(),
321        }
322    }
323
324    /// Returns the [`Location`] where this optional is being materialized.
325    pub fn location(&self) -> &L {
326        &self.location
327    }
328
329    /// Transforms the optional value by applying a function `f` to it,
330    /// continuously as the input is updated.
331    ///
332    /// Whenever the optional is empty, the output optional is also empty.
333    ///
334    /// # Example
335    /// ```rust
336    /// # #[cfg(feature = "deploy")] {
337    /// # use hydro_lang::prelude::*;
338    /// # use futures::StreamExt;
339    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340    /// let tick = process.tick();
341    /// let optional = tick.optional_first_tick(q!(1));
342    /// optional.map(q!(|v| v + 1)).all_ticks()
343    /// # }, |mut stream| async move {
344    /// // 2
345    /// # assert_eq!(stream.next().await.unwrap(), 2);
346    /// # }));
347    /// # }
348    /// ```
349    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
350    where
351        F: Fn(T) -> U + 'a,
352    {
353        let f = f.splice_fn1_ctx(&self.location).into();
354        Optional::new(
355            self.location.clone(),
356            HydroNode::Map {
357                f,
358                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
359                metadata: self
360                    .location
361                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
362            },
363        )
364    }
365
366    /// Transforms the optional value by applying a function `f` to it and then flattening
367    /// the result into a stream, preserving the order of elements.
368    ///
369    /// If the optional is empty, the output stream is also empty. If the optional contains
370    /// a value, `f` is applied to produce an iterator, and all items from that iterator
371    /// are emitted in the output stream in deterministic order.
372    ///
373    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
374    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
375    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
376    ///
377    /// # Example
378    /// ```rust
379    /// # #[cfg(feature = "deploy")] {
380    /// # use hydro_lang::prelude::*;
381    /// # use futures::StreamExt;
382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383    /// let tick = process.tick();
384    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
385    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
386    /// # }, |mut stream| async move {
387    /// // 1, 2, 3
388    /// # for w in vec![1, 2, 3] {
389    /// #     assert_eq!(stream.next().await.unwrap(), w);
390    /// # }
391    /// # }));
392    /// # }
393    /// ```
394    pub fn flat_map_ordered<U, I, F>(
395        self,
396        f: impl IntoQuotedMut<'a, F, L>,
397    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
398    where
399        I: IntoIterator<Item = U>,
400        F: Fn(T) -> I + 'a,
401    {
402        let f = f.splice_fn1_ctx(&self.location).into();
403        Stream::new(
404            self.location.clone(),
405            HydroNode::FlatMap {
406                f,
407                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                metadata: self.location.new_node_metadata(
409                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
410                ),
411            },
412        )
413    }
414
415    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
416    /// for the output type `I` to produce items in any order.
417    ///
418    /// If the optional is empty, the output stream is also empty. If the optional contains
419    /// a value, `f` is applied to produce an iterator, and all items from that iterator
420    /// are emitted in the output stream in non-deterministic order.
421    ///
422    /// # Example
423    /// ```rust
424    /// # #[cfg(feature = "deploy")] {
425    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
426    /// # use futures::StreamExt;
427    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
428    /// let tick = process.tick();
429    /// let optional = tick.optional_first_tick(q!(
430    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
431    /// ));
432    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
433    /// # }, |mut stream| async move {
434    /// // 1, 2, 3, but in no particular order
435    /// # let mut results = Vec::new();
436    /// # for _ in 0..3 {
437    /// #     results.push(stream.next().await.unwrap());
438    /// # }
439    /// # results.sort();
440    /// # assert_eq!(results, vec![1, 2, 3]);
441    /// # }));
442    /// # }
443    /// ```
444    pub fn flat_map_unordered<U, I, F>(
445        self,
446        f: impl IntoQuotedMut<'a, F, L>,
447    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
448    where
449        I: IntoIterator<Item = U>,
450        F: Fn(T) -> I + 'a,
451    {
452        let f = f.splice_fn1_ctx(&self.location).into();
453        Stream::new(
454            self.location.clone(),
455            HydroNode::FlatMap {
456                f,
457                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458                metadata: self
459                    .location
460                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
461            },
462        )
463    }
464
465    /// Flattens the optional value into a stream, preserving the order of elements.
466    ///
467    /// If the optional is empty, the output stream is also empty. If the optional contains
468    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
469    /// in the output stream in deterministic order.
470    ///
471    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
472    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
473    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
474    ///
475    /// # Example
476    /// ```rust
477    /// # #[cfg(feature = "deploy")] {
478    /// # use hydro_lang::prelude::*;
479    /// # use futures::StreamExt;
480    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
481    /// let tick = process.tick();
482    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
483    /// optional.flatten_ordered().all_ticks()
484    /// # }, |mut stream| async move {
485    /// // 1, 2, 3
486    /// # for w in vec![1, 2, 3] {
487    /// #     assert_eq!(stream.next().await.unwrap(), w);
488    /// # }
489    /// # }));
490    /// # }
491    /// ```
492    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
493    where
494        T: IntoIterator<Item = U>,
495    {
496        self.flat_map_ordered(q!(|v| v))
497    }
498
499    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
500    /// for the element type `T` to produce items in any order.
501    ///
502    /// If the optional is empty, the output stream is also empty. If the optional contains
503    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
504    /// in the output stream in non-deterministic order.
505    ///
506    /// # Example
507    /// ```rust
508    /// # #[cfg(feature = "deploy")] {
509    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
510    /// # use futures::StreamExt;
511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
512    /// let tick = process.tick();
513    /// let optional = tick.optional_first_tick(q!(
514    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
515    /// ));
516    /// optional.flatten_unordered().all_ticks()
517    /// # }, |mut stream| async move {
518    /// // 1, 2, 3, but in no particular order
519    /// # let mut results = Vec::new();
520    /// # for _ in 0..3 {
521    /// #     results.push(stream.next().await.unwrap());
522    /// # }
523    /// # results.sort();
524    /// # assert_eq!(results, vec![1, 2, 3]);
525    /// # }));
526    /// # }
527    /// ```
528    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
529    where
530        T: IntoIterator<Item = U>,
531    {
532        self.flat_map_unordered(q!(|v| v))
533    }
534
535    /// Creates an optional containing only the value if it satisfies a predicate `f`.
536    ///
537    /// If the optional is empty, the output optional is also empty. If the optional contains
538    /// a value and the predicate returns `true`, the output optional contains the same value.
539    /// If the predicate returns `false`, the output optional is empty.
540    ///
541    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
542    /// not modify or take ownership of the value. If you need to modify the value while filtering
543    /// use [`Optional::filter_map`] instead.
544    ///
545    /// # Example
546    /// ```rust
547    /// # #[cfg(feature = "deploy")] {
548    /// # use hydro_lang::prelude::*;
549    /// # use futures::StreamExt;
550    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551    /// let tick = process.tick();
552    /// let optional = tick.optional_first_tick(q!(5));
553    /// optional.filter(q!(|&x| x > 3)).all_ticks()
554    /// # }, |mut stream| async move {
555    /// // 5
556    /// # assert_eq!(stream.next().await.unwrap(), 5);
557    /// # }));
558    /// # }
559    /// ```
560    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
561    where
562        F: Fn(&T) -> bool + 'a,
563    {
564        let f = f.splice_fn1_borrow_ctx(&self.location).into();
565        Optional::new(
566            self.location.clone(),
567            HydroNode::Filter {
568                f,
569                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
570                metadata: self.location.new_node_metadata(Self::collection_kind()),
571            },
572        )
573    }
574
575    /// An operator that both filters and maps. It yields only the value if the supplied
576    /// closure `f` returns `Some(value)`.
577    ///
578    /// If the optional is empty, the output optional is also empty. If the optional contains
579    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
580    /// If the closure returns `None`, the output optional is empty.
581    ///
582    /// # Example
583    /// ```rust
584    /// # #[cfg(feature = "deploy")] {
585    /// # use hydro_lang::prelude::*;
586    /// # use futures::StreamExt;
587    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
588    /// let tick = process.tick();
589    /// let optional = tick.optional_first_tick(q!("42"));
590    /// optional
591    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
592    ///     .all_ticks()
593    /// # }, |mut stream| async move {
594    /// // 42
595    /// # assert_eq!(stream.next().await.unwrap(), 42);
596    /// # }));
597    /// # }
598    /// ```
599    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
600    where
601        F: Fn(T) -> Option<U> + 'a,
602    {
603        let f = f.splice_fn1_ctx(&self.location).into();
604        Optional::new(
605            self.location.clone(),
606            HydroNode::FilterMap {
607                f,
608                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609                metadata: self
610                    .location
611                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
612            },
613        )
614    }
615
616    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
617    ///
618    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
619    /// non-null. This is useful for combining several pieces of state together.
620    ///
621    /// # Example
622    /// ```rust
623    /// # #[cfg(feature = "deploy")] {
624    /// # use hydro_lang::prelude::*;
625    /// # use futures::StreamExt;
626    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627    /// let tick = process.tick();
628    /// let numbers = process
629    ///   .source_iter(q!(vec![123, 456, 789]))
630    ///   .batch(&tick, nondet!(/** test */));
631    /// let min = numbers.clone().min(); // Optional
632    /// let max = numbers.max(); // Optional
633    /// min.zip(max).all_ticks()
634    /// # }, |mut stream| async move {
635    /// // [(123, 789)]
636    /// # for w in vec![(123, 789)] {
637    /// #     assert_eq!(stream.next().await.unwrap(), w);
638    /// # }
639    /// # }));
640    /// # }
641    /// ```
642    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
643    where
644        B: IsBounded,
645    {
646        let other: Optional<O, L, B> = other.into();
647        check_matching_location(&self.location, &other.location);
648
649        if L::is_top_level()
650            && let Some(tick) = self.location.try_tick()
651        {
652            let out = zip_inside_tick(
653                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
654                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
655            )
656            .latest();
657
658            Optional::new(
659                out.location.clone(),
660                out.ir_node.replace(HydroNode::Placeholder),
661            )
662        } else {
663            zip_inside_tick(self, other)
664        }
665    }
666
667    /// Passes through `self` when it has a value, otherwise passes through `other`.
668    ///
669    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
670    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
671    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
672    ///
673    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
674    /// of the inputs change (including to/from null states).
675    ///
676    /// # Example
677    /// ```rust
678    /// # #[cfg(feature = "deploy")] {
679    /// # use hydro_lang::prelude::*;
680    /// # use futures::StreamExt;
681    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
682    /// let tick = process.tick();
683    /// // ticks are lazy by default, forces the second tick to run
684    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
685    ///
686    /// let some_first_tick = tick.optional_first_tick(q!(123));
687    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
688    /// some_first_tick.or(some_second_tick).all_ticks()
689    /// # }, |mut stream| async move {
690    /// // [123 /* first tick */, 456 /* second tick */]
691    /// # for w in vec![123, 456] {
692    /// #     assert_eq!(stream.next().await.unwrap(), w);
693    /// # }
694    /// # }));
695    /// # }
696    /// ```
697    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
698        check_matching_location(&self.location, &other.location);
699
700        if L::is_top_level()
701            && !B::BOUNDED // only if unbounded we need to use a tick
702            && let Some(tick) = self.location.try_tick()
703        {
704            let out = or_inside_tick(
705                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
706                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
707            )
708            .latest();
709
710            Optional::new(
711                out.location.clone(),
712                out.ir_node.replace(HydroNode::Placeholder),
713            )
714        } else {
715            Optional::new(
716                self.location.clone(),
717                HydroNode::ChainFirst {
718                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
719                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
720                    metadata: self.location.new_node_metadata(Self::collection_kind()),
721                },
722            )
723        }
724    }
725
726    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
727    ///
728    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
729    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
730    ///
731    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
732    /// of the inputs change (including to/from null states).
733    ///
734    /// # Example
735    /// ```rust
736    /// # #[cfg(feature = "deploy")] {
737    /// # use hydro_lang::prelude::*;
738    /// # use futures::StreamExt;
739    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
740    /// let tick = process.tick();
741    /// // ticks are lazy by default, forces the later ticks to run
742    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
743    ///
744    /// let some_first_tick = tick.optional_first_tick(q!(123));
745    /// some_first_tick
746    ///     .unwrap_or(tick.singleton(q!(456)))
747    ///     .all_ticks()
748    /// # }, |mut stream| async move {
749    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
750    /// # for w in vec![123, 456, 456, 456] {
751    /// #     assert_eq!(stream.next().await.unwrap(), w);
752    /// # }
753    /// # }));
754    /// # }
755    /// ```
756    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
757        let res_option = self.or(other.into());
758        Singleton::new(
759            res_option.location.clone(),
760            HydroNode::Cast {
761                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
762                metadata: res_option
763                    .location
764                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
765            },
766        )
767    }
768
769    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
770    ///
771    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
772    /// [`Optional`] when the default value of the type is a suitable fallback.
773    ///
774    /// # Example
775    /// ```rust
776    /// # #[cfg(feature = "deploy")] {
777    /// # use hydro_lang::prelude::*;
778    /// # use futures::StreamExt;
779    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780    /// let tick = process.tick();
781    /// // ticks are lazy by default, forces the later ticks to run
782    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
783    ///
784    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
785    /// some_first_tick.unwrap_or_default().all_ticks()
786    /// # }, |mut stream| async move {
787    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
788    /// # for w in vec![123, 0, 0, 0] {
789    /// #     assert_eq!(stream.next().await.unwrap(), w);
790    /// # }
791    /// # }));
792    /// # }
793    /// ```
794    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
795    where
796        T: Default + Clone,
797    {
798        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
799    }
800
801    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
802    ///
803    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
804    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
805    /// so that Hydro can skip any computation on null values.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// let tick = process.tick();
814    /// // ticks are lazy by default, forces the later ticks to run
815    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
816    ///
817    /// let some_first_tick = tick.optional_first_tick(q!(123));
818    /// some_first_tick.into_singleton().all_ticks()
819    /// # }, |mut stream| async move {
820    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
821    /// # for w in vec![Some(123), None, None, None] {
822    /// #     assert_eq!(stream.next().await.unwrap(), w);
823    /// # }
824    /// # }));
825    /// # }
826    /// ```
827    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
828    where
829        T: Clone,
830    {
831        let none: syn::Expr = parse_quote!(::std::option::Option::None);
832
833        let none_singleton = Singleton::new(
834            self.location.clone(),
835            HydroNode::SingletonSource {
836                value: none.into(),
837                first_tick_only: false,
838                metadata: self
839                    .location
840                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
841            },
842        );
843
844        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
845    }
846
847    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
848    ///
849    /// # Example
850    /// ```rust
851    /// # #[cfg(feature = "deploy")] {
852    /// # use hydro_lang::prelude::*;
853    /// # use futures::StreamExt;
854    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
855    /// let tick = process.tick();
856    /// // ticks are lazy by default, forces the second tick to run
857    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
858    ///
859    /// let some_first_tick = tick.optional_first_tick(q!(42));
860    /// some_first_tick.is_some().all_ticks()
861    /// # }, |mut stream| async move {
862    /// // [true /* first tick */, false /* second tick */, ...]
863    /// # for w in vec![true, false] {
864    /// #     assert_eq!(stream.next().await.unwrap(), w);
865    /// # }
866    /// # }));
867    /// # }
868    /// ```
869    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
870    pub fn is_some(self) -> Singleton<bool, L, B> {
871        self.map(q!(|_| ()))
872            .into_singleton()
873            .map(q!(|o| o.is_some()))
874    }
875
876    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// let tick = process.tick();
885    /// // ticks are lazy by default, forces the second tick to run
886    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
887    ///
888    /// let some_first_tick = tick.optional_first_tick(q!(42));
889    /// some_first_tick.is_none().all_ticks()
890    /// # }, |mut stream| async move {
891    /// // [false /* first tick */, true /* second tick */, ...]
892    /// # for w in vec![false, true] {
893    /// #     assert_eq!(stream.next().await.unwrap(), w);
894    /// # }
895    /// # }));
896    /// # }
897    /// ```
898    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
899    pub fn is_none(self) -> Singleton<bool, L, B> {
900        self.map(q!(|_| ()))
901            .into_singleton()
902            .map(q!(|o| o.is_none()))
903    }
904
905    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
906    /// values are equal, `false` otherwise (including when either is null).
907    ///
908    /// # Example
909    /// ```rust
910    /// # #[cfg(feature = "deploy")] {
911    /// # use hydro_lang::prelude::*;
912    /// # use futures::StreamExt;
913    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
914    /// let tick = process.tick();
915    /// // ticks are lazy by default, forces the second tick to run
916    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
917    ///
918    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
919    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
920    /// a.is_some_and_equals(b).all_ticks()
921    /// # }, |mut stream| async move {
922    /// // [true, false]
923    /// # for w in vec![true, false] {
924    /// #     assert_eq!(stream.next().await.unwrap(), w);
925    /// # }
926    /// # }));
927    /// # }
928    /// ```
929    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
930    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
931    where
932        T: PartialEq + Clone,
933        B: IsBounded,
934    {
935        self.into_singleton()
936            .zip(other.into_singleton())
937            .map(q!(|(a, b)| a.is_some() && a == b))
938    }
939
940    /// An operator which allows you to "name" a `HydroNode`.
941    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
942    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
943        {
944            let mut node = self.ir_node.borrow_mut();
945            let metadata = node.metadata_mut();
946            metadata.tag = Some(name.to_owned());
947        }
948        self
949    }
950
951    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
952    /// implies that `B == Bounded`.
953    pub fn make_bounded(self) -> Optional<T, L, Bounded>
954    where
955        B: IsBounded,
956    {
957        Optional::new(
958            self.location.clone(),
959            self.ir_node.replace(HydroNode::Placeholder),
960        )
961    }
962
963    /// Clones this bounded optional into a tick, returning a optional that has the
964    /// same value as the outer optional. Because the outer optional is bounded, this
965    /// is deterministic because there is only a single immutable version.
966    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
967    where
968        B: IsBounded,
969        T: Clone,
970    {
971        // TODO(shadaj): avoid printing simulator logs for this snapshot
972        self.snapshot(
973            tick,
974            nondet!(/** bounded top-level optional so deterministic */),
975        )
976    }
977
978    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
979    /// non-null. Otherwise, the stream is empty.
980    ///
981    /// # Example
982    /// ```rust
983    /// # #[cfg(feature = "deploy")] {
984    /// # use hydro_lang::prelude::*;
985    /// # use futures::StreamExt;
986    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
987    /// # let tick = process.tick();
988    /// # // ticks are lazy by default, forces the second tick to run
989    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
990    /// # let batch_first_tick = process
991    /// #   .source_iter(q!(vec![]))
992    /// #   .batch(&tick, nondet!(/** test */));
993    /// # let batch_second_tick = process
994    /// #   .source_iter(q!(vec![123, 456]))
995    /// #   .batch(&tick, nondet!(/** test */))
996    /// #   .defer_tick(); // appears on the second tick
997    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
998    /// input_batch // first tick: [], second tick: [123, 456]
999    ///     .clone()
1000    ///     .max()
1001    ///     .into_stream()
1002    ///     .chain(input_batch)
1003    ///     .all_ticks()
1004    /// # }, |mut stream| async move {
1005    /// // [456, 123, 456]
1006    /// # for w in vec![456, 123, 456] {
1007    /// #     assert_eq!(stream.next().await.unwrap(), w);
1008    /// # }
1009    /// # }));
1010    /// # }
1011    /// ```
1012    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1013    where
1014        B: IsBounded,
1015    {
1016        Stream::new(
1017            self.location.clone(),
1018            HydroNode::Cast {
1019                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1020                metadata: self.location.new_node_metadata(Stream::<
1021                    T,
1022                    Tick<L>,
1023                    Bounded,
1024                    TotalOrder,
1025                    ExactlyOnce,
1026                >::collection_kind()),
1027            },
1028        )
1029    }
1030
1031    /// Filters this optional, passing through the value if the boolean signal is `true`,
1032    /// otherwise the output is null.
1033    ///
1034    /// # Example
1035    /// ```rust
1036    /// # #[cfg(feature = "deploy")] {
1037    /// # use hydro_lang::prelude::*;
1038    /// # use futures::StreamExt;
1039    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1040    /// let tick = process.tick();
1041    /// // ticks are lazy by default, forces the second tick to run
1042    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1043    ///
1044    /// let some_first_tick = tick.optional_first_tick(q!(()));
1045    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1046    /// let batch_first_tick = process
1047    ///   .source_iter(q!(vec![456]))
1048    ///   .batch(&tick, nondet!(/** test */));
1049    /// let batch_second_tick = process
1050    ///   .source_iter(q!(vec![789]))
1051    ///   .batch(&tick, nondet!(/** test */))
1052    ///   .defer_tick();
1053    /// batch_first_tick.chain(batch_second_tick).first()
1054    ///   .filter_if(signal)
1055    ///   .unwrap_or(tick.singleton(q!(0)))
1056    ///   .all_ticks()
1057    /// # }, |mut stream| async move {
1058    /// // [456, 0]
1059    /// # for w in vec![456, 0] {
1060    /// #     assert_eq!(stream.next().await.unwrap(), w);
1061    /// # }
1062    /// # }));
1063    /// # }
1064    /// ```
1065    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1066    where
1067        B: IsBounded,
1068    {
1069        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1070    }
1071
1072    /// Filters this optional, passing through the optional value if it is non-null **and** the
1073    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1074    ///
1075    /// Useful for conditionally processing, such as only emitting an optional's value outside
1076    /// a tick if some other condition is satisfied.
1077    ///
1078    /// # Example
1079    /// ```rust
1080    /// # #[cfg(feature = "deploy")] {
1081    /// # use hydro_lang::prelude::*;
1082    /// # use futures::StreamExt;
1083    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1084    /// let tick = process.tick();
1085    /// // ticks are lazy by default, forces the second tick to run
1086    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1087    ///
1088    /// let batch_first_tick = process
1089    ///   .source_iter(q!(vec![]))
1090    ///   .batch(&tick, nondet!(/** test */));
1091    /// let batch_second_tick = process
1092    ///   .source_iter(q!(vec![456]))
1093    ///   .batch(&tick, nondet!(/** test */))
1094    ///   .defer_tick(); // appears on the second tick
1095    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1096    /// batch_first_tick.chain(batch_second_tick).first()
1097    ///   .filter_if_some(some_on_first_tick)
1098    ///   .unwrap_or(tick.singleton(q!(789)))
1099    ///   .all_ticks()
1100    /// # }, |mut stream| async move {
1101    /// // [789, 789]
1102    /// # for w in vec![789, 789] {
1103    /// #     assert_eq!(stream.next().await.unwrap(), w);
1104    /// # }
1105    /// # }));
1106    /// # }
1107    /// ```
1108    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1109    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1110    where
1111        B: IsBounded,
1112    {
1113        self.filter_if(signal.is_some())
1114    }
1115
1116    /// Filters this optional, passing through the optional value if it is non-null **and** the
1117    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1118    ///
1119    /// Useful for conditionally processing, such as only emitting an optional's value outside
1120    /// a tick if some other condition is satisfied.
1121    ///
1122    /// # Example
1123    /// ```rust
1124    /// # #[cfg(feature = "deploy")] {
1125    /// # use hydro_lang::prelude::*;
1126    /// # use futures::StreamExt;
1127    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1128    /// let tick = process.tick();
1129    /// // ticks are lazy by default, forces the second tick to run
1130    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1131    ///
1132    /// let batch_first_tick = process
1133    ///   .source_iter(q!(vec![]))
1134    ///   .batch(&tick, nondet!(/** test */));
1135    /// let batch_second_tick = process
1136    ///   .source_iter(q!(vec![456]))
1137    ///   .batch(&tick, nondet!(/** test */))
1138    ///   .defer_tick(); // appears on the second tick
1139    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1140    /// batch_first_tick.chain(batch_second_tick).first()
1141    ///   .filter_if_none(some_on_first_tick)
1142    ///   .unwrap_or(tick.singleton(q!(789)))
1143    ///   .all_ticks()
1144    /// # }, |mut stream| async move {
1145    /// // [789, 789]
1146    /// # for w in vec![789, 456] {
1147    /// #     assert_eq!(stream.next().await.unwrap(), w);
1148    /// # }
1149    /// # }));
1150    /// # }
1151    /// ```
1152    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1153    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1154    where
1155        B: IsBounded,
1156    {
1157        self.filter_if(other.is_none())
1158    }
1159
1160    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1161    ///
1162    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1163    /// having a value, such as only releasing a piece of state if the node is the leader.
1164    ///
1165    /// # Example
1166    /// ```rust
1167    /// # #[cfg(feature = "deploy")] {
1168    /// # use hydro_lang::prelude::*;
1169    /// # use futures::StreamExt;
1170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171    /// let tick = process.tick();
1172    /// // ticks are lazy by default, forces the second tick to run
1173    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1174    ///
1175    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1176    /// some_on_first_tick
1177    ///     .if_some_then(tick.singleton(q!(456)))
1178    ///     .unwrap_or(tick.singleton(q!(123)))
1179    /// # .all_ticks()
1180    /// # }, |mut stream| async move {
1181    /// // 456 (first tick) ~> 123 (second tick onwards)
1182    /// # for w in vec![456, 123, 123] {
1183    /// #     assert_eq!(stream.next().await.unwrap(), w);
1184    /// # }
1185    /// # }));
1186    /// # }
1187    /// ```
1188    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1189    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1190    where
1191        B: IsBounded,
1192    {
1193        value.filter_if(self.is_some())
1194    }
1195}
1196
1197impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1198where
1199    L: Location<'a> + NoTick,
1200{
1201    /// Returns an optional value corresponding to the latest snapshot of the optional
1202    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1203    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1204    /// all snapshots of this optional into the atomic-associated tick will observe the
1205    /// same value each tick.
1206    ///
1207    /// # Non-Determinism
1208    /// Because this picks a snapshot of a optional whose value is continuously changing,
1209    /// the output optional has a non-deterministic value since the snapshot can be at an
1210    /// arbitrary point in time.
1211    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1212        Optional::new(
1213            self.location.clone().tick,
1214            HydroNode::Batch {
1215                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1216                metadata: self
1217                    .location
1218                    .tick
1219                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1220            },
1221        )
1222    }
1223
1224    /// Returns this optional back into a top-level, asynchronous execution context where updates
1225    /// to the value will be asynchronously propagated.
1226    pub fn end_atomic(self) -> Optional<T, L, B> {
1227        Optional::new(
1228            self.location.tick.l.clone(),
1229            HydroNode::EndAtomic {
1230                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1231                metadata: self
1232                    .location
1233                    .tick
1234                    .l
1235                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1236            },
1237        )
1238    }
1239}
1240
1241impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1242where
1243    L: Location<'a>,
1244{
1245    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1246    /// will observe the same version of the value and will be executed synchronously before any
1247    /// outputs are yielded (in [`Optional::end_atomic`]).
1248    ///
1249    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1250    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1251    /// a different version).
1252    ///
1253    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1254    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1255    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1256    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1257        let out_location = Atomic { tick: tick.clone() };
1258        Optional::new(
1259            out_location.clone(),
1260            HydroNode::BeginAtomic {
1261                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1262                metadata: out_location
1263                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1264            },
1265        )
1266    }
1267
1268    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1269    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1270    /// relevant data that contributed to the snapshot at tick `t`.
1271    ///
1272    /// # Non-Determinism
1273    /// Because this picks a snapshot of a optional whose value is continuously changing,
1274    /// the output optional has a non-deterministic value since the snapshot can be at an
1275    /// arbitrary point in time.
1276    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1277        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1278        Optional::new(
1279            tick.clone(),
1280            HydroNode::Batch {
1281                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1282                metadata: tick
1283                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1284            },
1285        )
1286    }
1287
1288    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1289    /// with order corresponding to increasing prefixes of data contributing to the optional.
1290    ///
1291    /// # Non-Determinism
1292    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1293    /// to non-deterministic batching and arrival of inputs, the output stream is
1294    /// non-deterministic.
1295    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1296    where
1297        L: NoTick,
1298    {
1299        let tick = self.location.tick();
1300        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1301    }
1302
1303    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1304    /// value taken at various points in time. Because the input optional may be
1305    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1306    /// represent the value of the optional given some prefix of the streams leading up to
1307    /// it.
1308    ///
1309    /// # Non-Determinism
1310    /// The output stream is non-deterministic in which elements are sampled, since this
1311    /// is controlled by a clock.
1312    pub fn sample_every(
1313        self,
1314        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1315        nondet: NonDet,
1316    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1317    where
1318        L: NoTick + NoAtomic,
1319    {
1320        let samples = self.location.source_interval(interval, nondet);
1321        let tick = self.location.tick();
1322
1323        self.snapshot(&tick, nondet)
1324            .filter_if(samples.batch(&tick, nondet).first().is_some())
1325            .all_ticks()
1326            .weaken_retries()
1327    }
1328}
1329
1330impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1331where
1332    L: Location<'a>,
1333{
1334    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1335    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1336    /// null values).
1337    ///
1338    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1339    /// producing one element in the output for each (non-null) tick. This is useful for batched
1340    /// computations, where the results from each tick must be combined together.
1341    ///
1342    /// # Example
1343    /// ```rust
1344    /// # #[cfg(feature = "deploy")] {
1345    /// # use hydro_lang::prelude::*;
1346    /// # use futures::StreamExt;
1347    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348    /// # let tick = process.tick();
1349    /// # // ticks are lazy by default, forces the second tick to run
1350    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1351    /// # let batch_first_tick = process
1352    /// #   .source_iter(q!(vec![]))
1353    /// #   .batch(&tick, nondet!(/** test */));
1354    /// # let batch_second_tick = process
1355    /// #   .source_iter(q!(vec![1, 2, 3]))
1356    /// #   .batch(&tick, nondet!(/** test */))
1357    /// #   .defer_tick(); // appears on the second tick
1358    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1359    /// input_batch // first tick: [], second tick: [1, 2, 3]
1360    ///     .max()
1361    ///     .all_ticks()
1362    /// # }, |mut stream| async move {
1363    /// // [3]
1364    /// # for w in vec![3] {
1365    /// #     assert_eq!(stream.next().await.unwrap(), w);
1366    /// # }
1367    /// # }));
1368    /// # }
1369    /// ```
1370    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1371        self.into_stream().all_ticks()
1372    }
1373
1374    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1375    /// which will stream the value computed in _each_ tick as a separate stream element.
1376    ///
1377    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1378    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1379    /// optional's [`Tick`] context.
1380    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1381        self.into_stream().all_ticks_atomic()
1382    }
1383
1384    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1385    /// be asynchronously updated with the latest value of the optional inside the tick, including
1386    /// whether the optional is null or not.
1387    ///
1388    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1389    /// tick that tracks the inner value. This is useful for getting the value as of the
1390    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1391    ///
1392    /// # Example
1393    /// ```rust
1394    /// # #[cfg(feature = "deploy")] {
1395    /// # use hydro_lang::prelude::*;
1396    /// # use futures::StreamExt;
1397    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1398    /// # let tick = process.tick();
1399    /// # // ticks are lazy by default, forces the second tick to run
1400    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1401    /// # let batch_first_tick = process
1402    /// #   .source_iter(q!(vec![]))
1403    /// #   .batch(&tick, nondet!(/** test */));
1404    /// # let batch_second_tick = process
1405    /// #   .source_iter(q!(vec![1, 2, 3]))
1406    /// #   .batch(&tick, nondet!(/** test */))
1407    /// #   .defer_tick(); // appears on the second tick
1408    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1409    /// input_batch // first tick: [], second tick: [1, 2, 3]
1410    ///     .max()
1411    ///     .latest()
1412    /// # .into_singleton()
1413    /// # .sample_eager(nondet!(/** test */))
1414    /// # }, |mut stream| async move {
1415    /// // asynchronously changes from None ~> 3
1416    /// # for w in vec![None, Some(3)] {
1417    /// #     assert_eq!(stream.next().await.unwrap(), w);
1418    /// # }
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn latest(self) -> Optional<T, L, Unbounded> {
1423        Optional::new(
1424            self.location.outer().clone(),
1425            HydroNode::YieldConcat {
1426                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1427                metadata: self
1428                    .location
1429                    .outer()
1430                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1431            },
1432        )
1433    }
1434
1435    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1436    /// be updated with the latest value of the optional inside the tick.
1437    ///
1438    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1439    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1440    /// optional's [`Tick`] context.
1441    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1442        let out_location = Atomic {
1443            tick: self.location.clone(),
1444        };
1445
1446        Optional::new(
1447            out_location.clone(),
1448            HydroNode::YieldConcat {
1449                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1450                metadata: out_location
1451                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1452            },
1453        )
1454    }
1455
1456    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1457    /// always has the state of `self` at tick `T - 1`.
1458    ///
1459    /// At tick `0`, the output optional is null, since there is no previous tick.
1460    ///
1461    /// This operator enables stateful iterative processing with ticks, by sending data from one
1462    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1463    ///
1464    /// # Example
1465    /// ```rust
1466    /// # #[cfg(feature = "deploy")] {
1467    /// # use hydro_lang::prelude::*;
1468    /// # use futures::StreamExt;
1469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1470    /// let tick = process.tick();
1471    /// // ticks are lazy by default, forces the second tick to run
1472    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1473    ///
1474    /// let batch_first_tick = process
1475    ///   .source_iter(q!(vec![1, 2]))
1476    ///   .batch(&tick, nondet!(/** test */));
1477    /// let batch_second_tick = process
1478    ///   .source_iter(q!(vec![3, 4]))
1479    ///   .batch(&tick, nondet!(/** test */))
1480    ///   .defer_tick(); // appears on the second tick
1481    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1482    ///   .reduce(q!(|state, v| *state += v));
1483    ///
1484    /// current_tick_sum.clone().into_singleton().zip(
1485    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1486    /// ).all_ticks()
1487    /// # }, |mut stream| async move {
1488    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1489    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1490    /// #     assert_eq!(stream.next().await.unwrap(), w);
1491    /// # }
1492    /// # }));
1493    /// # }
1494    /// ```
1495    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1496        Optional::new(
1497            self.location.clone(),
1498            HydroNode::DeferTick {
1499                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1500                metadata: self.location.new_node_metadata(Self::collection_kind()),
1501            },
1502        )
1503    }
1504}
1505
1506#[cfg(test)]
1507mod tests {
1508    #[cfg(feature = "deploy")]
1509    use futures::StreamExt;
1510    #[cfg(feature = "deploy")]
1511    use hydro_deploy::Deployment;
1512    #[cfg(any(feature = "deploy", feature = "sim"))]
1513    use stageleft::q;
1514
1515    #[cfg(feature = "deploy")]
1516    use super::Optional;
1517    #[cfg(any(feature = "deploy", feature = "sim"))]
1518    use crate::compile::builder::FlowBuilder;
1519    #[cfg(any(feature = "deploy", feature = "sim"))]
1520    use crate::location::Location;
1521    #[cfg(feature = "deploy")]
1522    use crate::nondet::nondet;
1523
1524    #[cfg(feature = "deploy")]
1525    #[tokio::test]
1526    async fn optional_or_cardinality() {
1527        let mut deployment = Deployment::new();
1528
1529        let mut flow = FlowBuilder::new();
1530        let node = flow.process::<()>();
1531        let external = flow.external::<()>();
1532
1533        let node_tick = node.tick();
1534        let tick_singleton = node_tick.singleton(q!(123));
1535        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1536        let counts = tick_optional_inhabited
1537            .clone()
1538            .or(tick_optional_inhabited)
1539            .into_stream()
1540            .count()
1541            .all_ticks()
1542            .send_bincode_external(&external);
1543
1544        let nodes = flow
1545            .with_process(&node, deployment.Localhost())
1546            .with_external(&external, deployment.Localhost())
1547            .deploy(&mut deployment);
1548
1549        deployment.deploy().await.unwrap();
1550
1551        let mut external_out = nodes.connect(counts).await;
1552
1553        deployment.start().await.unwrap();
1554
1555        assert_eq!(external_out.next().await.unwrap(), 1);
1556    }
1557
1558    #[cfg(feature = "deploy")]
1559    #[tokio::test]
1560    async fn into_singleton_top_level_none_cardinality() {
1561        let mut deployment = Deployment::new();
1562
1563        let mut flow = FlowBuilder::new();
1564        let node = flow.process::<()>();
1565        let external = flow.external::<()>();
1566
1567        let node_tick = node.tick();
1568        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1569        let into_singleton = top_level_none.into_singleton();
1570
1571        let tick_driver = node.spin();
1572
1573        let counts = into_singleton
1574            .snapshot(&node_tick, nondet!(/** test */))
1575            .into_stream()
1576            .count()
1577            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1578            .map(q!(|(c, _)| c))
1579            .all_ticks()
1580            .send_bincode_external(&external);
1581
1582        let nodes = flow
1583            .with_process(&node, deployment.Localhost())
1584            .with_external(&external, deployment.Localhost())
1585            .deploy(&mut deployment);
1586
1587        deployment.deploy().await.unwrap();
1588
1589        let mut external_out = nodes.connect(counts).await;
1590
1591        deployment.start().await.unwrap();
1592
1593        assert_eq!(external_out.next().await.unwrap(), 1);
1594        assert_eq!(external_out.next().await.unwrap(), 1);
1595        assert_eq!(external_out.next().await.unwrap(), 1);
1596    }
1597
1598    #[cfg(feature = "deploy")]
1599    #[tokio::test]
1600    async fn into_singleton_unbounded_top_level_none_cardinality() {
1601        let mut deployment = Deployment::new();
1602
1603        let mut flow = FlowBuilder::new();
1604        let node = flow.process::<()>();
1605        let external = flow.external::<()>();
1606
1607        let node_tick = node.tick();
1608        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1609        let into_singleton = top_level_none.into_singleton();
1610
1611        let tick_driver = node.spin();
1612
1613        let counts = into_singleton
1614            .snapshot(&node_tick, nondet!(/** test */))
1615            .into_stream()
1616            .count()
1617            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1618            .map(q!(|(c, _)| c))
1619            .all_ticks()
1620            .send_bincode_external(&external);
1621
1622        let nodes = flow
1623            .with_process(&node, deployment.Localhost())
1624            .with_external(&external, deployment.Localhost())
1625            .deploy(&mut deployment);
1626
1627        deployment.deploy().await.unwrap();
1628
1629        let mut external_out = nodes.connect(counts).await;
1630
1631        deployment.start().await.unwrap();
1632
1633        assert_eq!(external_out.next().await.unwrap(), 1);
1634        assert_eq!(external_out.next().await.unwrap(), 1);
1635        assert_eq!(external_out.next().await.unwrap(), 1);
1636    }
1637
1638    #[cfg(feature = "sim")]
1639    #[test]
1640    fn top_level_optional_some_into_stream_no_replay() {
1641        let mut flow = FlowBuilder::new();
1642        let node = flow.process::<()>();
1643
1644        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1645        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1646        let filtered_some = folded.filter(q!(|_| true));
1647
1648        let out_recv = filtered_some.into_stream().sim_output();
1649
1650        flow.sim().exhaustive(async || {
1651            out_recv.assert_yields_only([10]).await;
1652        });
1653    }
1654
1655    #[cfg(feature = "sim")]
1656    #[test]
1657    fn top_level_optional_none_into_stream_no_replay() {
1658        let mut flow = FlowBuilder::new();
1659        let node = flow.process::<()>();
1660
1661        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1662        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1663        let filtered_none = folded.filter(q!(|_| false));
1664
1665        let out_recv = filtered_none.into_stream().sim_output();
1666
1667        flow.sim().exhaustive(async || {
1668            out_recv.assert_yields_only([] as [i32; 0]).await;
1669        });
1670    }
1671}