hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::NonDet;
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46    L: Location<'a>,
47{
48    fn defer_tick(self) -> Self {
49        Optional::defer_tick(self)
50    }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60        Optional::new(
61            location.clone(),
62            HydroNode::CycleSource {
63                ident,
64                metadata: location.new_node_metadata::<T>(),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            Location::id(&self.location),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .push_root(HydroRoot::CycleSink {
84                ident,
85                input: Box::new(self.ir_node.into_inner()),
86                out_location: Location::id(&self.location),
87                op_metadata: HydroIrOpMetadata::new(),
88            });
89    }
90}
91
92impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
93where
94    L: Location<'a>,
95{
96    type Location = Tick<L>;
97
98    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
99        Optional::new(
100            location.clone(),
101            HydroNode::CycleSource {
102                ident,
103                metadata: location.new_node_metadata::<T>(),
104            },
105        )
106    }
107}
108
109impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
110where
111    L: Location<'a>,
112{
113    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114        assert_eq!(
115            Location::id(&self.location),
116            expected_location,
117            "locations do not match"
118        );
119        self.location
120            .flow_state()
121            .borrow_mut()
122            .push_root(HydroRoot::CycleSink {
123                ident,
124                input: Box::new(self.ir_node.into_inner()),
125                out_location: Location::id(&self.location),
126                op_metadata: HydroIrOpMetadata::new(),
127            });
128    }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133    L: Location<'a> + NoTick,
134{
135    type Location = L;
136
137    fn create_source(ident: syn::Ident, location: L) -> Self {
138        Optional::new(
139            location.clone(),
140            HydroNode::Persist {
141                inner: Box::new(HydroNode::CycleSource {
142                    ident,
143                    metadata: location.new_node_metadata::<T>(),
144                }),
145                metadata: location.new_node_metadata::<T>(),
146            },
147        )
148    }
149}
150
151impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
152where
153    L: Location<'a> + NoTick,
154{
155    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
156        assert_eq!(
157            Location::id(&self.location),
158            expected_location,
159            "locations do not match"
160        );
161        let metadata = self.location.new_node_metadata::<T>();
162        self.location
163            .flow_state()
164            .borrow_mut()
165            .push_root(HydroRoot::CycleSink {
166                ident,
167                input: Box::new(HydroNode::Unpersist {
168                    inner: Box::new(self.ir_node.into_inner()),
169                    metadata: metadata.clone(),
170                }),
171                out_location: Location::id(&self.location),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174    }
175}
176
177impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
178where
179    L: Location<'a>,
180{
181    fn from(singleton: Optional<T, L, Bounded>) -> Self {
182        Optional::new(singleton.location, singleton.ir_node.into_inner())
183    }
184}
185
186impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
187where
188    L: Location<'a>,
189{
190    fn from(singleton: Singleton<T, L, B>) -> Self {
191        Optional::new(singleton.location, singleton.ir_node.into_inner())
192    }
193}
194
195impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
196where
197    T: Clone,
198    L: Location<'a>,
199{
200    fn clone(&self) -> Self {
201        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
202            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
203            *self.ir_node.borrow_mut() = HydroNode::Tee {
204                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
205                metadata: self.location.new_node_metadata::<T>(),
206            };
207        }
208
209        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
210            Optional {
211                location: self.location.clone(),
212                ir_node: HydroNode::Tee {
213                    inner: TeeNode(inner.0.clone()),
214                    metadata: metadata.clone(),
215                }
216                .into(),
217                _phantom: PhantomData,
218            }
219        } else {
220            unreachable!()
221        }
222    }
223}
224
225impl<'a, T, L, B: Boundedness> Optional<T, L, B>
226where
227    L: Location<'a>,
228{
229    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
230        Optional {
231            location,
232            ir_node: RefCell::new(ir_node),
233            _phantom: PhantomData,
234        }
235    }
236
237    /// Transforms the optional value by applying a function `f` to it,
238    /// continuously as the input is updated.
239    ///
240    /// Whenever the optional is empty, the output optional is also empty.
241    ///
242    /// # Example
243    /// ```rust
244    /// # use hydro_lang::prelude::*;
245    /// # use futures::StreamExt;
246    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
247    /// let tick = process.tick();
248    /// let optional = tick.optional_first_tick(q!(1));
249    /// optional.map(q!(|v| v + 1)).all_ticks()
250    /// # }, |mut stream| async move {
251    /// // 2
252    /// # assert_eq!(stream.next().await.unwrap(), 2);
253    /// # }));
254    /// ```
255    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
256    where
257        F: Fn(T) -> U + 'a,
258    {
259        let f = f.splice_fn1_ctx(&self.location).into();
260        Optional::new(
261            self.location.clone(),
262            HydroNode::Map {
263                f,
264                input: Box::new(self.ir_node.into_inner()),
265                metadata: self.location.new_node_metadata::<U>(),
266            },
267        )
268    }
269
270    /// Transforms the optional value by applying a function `f` to it and then flattening
271    /// the result into a stream, preserving the order of elements.
272    ///
273    /// If the optional is empty, the output stream is also empty. If the optional contains
274    /// a value, `f` is applied to produce an iterator, and all items from that iterator
275    /// are emitted in the output stream in deterministic order.
276    ///
277    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
278    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
279    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
280    ///
281    /// # Example
282    /// ```rust
283    /// # use hydro_lang::prelude::*;
284    /// # use futures::StreamExt;
285    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
286    /// let tick = process.tick();
287    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
288    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
289    /// # }, |mut stream| async move {
290    /// // 1, 2, 3
291    /// # for w in vec![1, 2, 3] {
292    /// #     assert_eq!(stream.next().await.unwrap(), w);
293    /// # }
294    /// # }));
295    /// ```
296    pub fn flat_map_ordered<U, I, F>(
297        self,
298        f: impl IntoQuotedMut<'a, F, L>,
299    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
300    where
301        I: IntoIterator<Item = U>,
302        F: Fn(T) -> I + 'a,
303    {
304        let f = f.splice_fn1_ctx(&self.location).into();
305        Stream::new(
306            self.location.clone(),
307            HydroNode::FlatMap {
308                f,
309                input: Box::new(self.ir_node.into_inner()),
310                metadata: self.location.new_node_metadata::<U>(),
311            },
312        )
313    }
314
315    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
316    /// for the output type `I` to produce items in any order.
317    ///
318    /// If the optional is empty, the output stream is also empty. If the optional contains
319    /// a value, `f` is applied to produce an iterator, and all items from that iterator
320    /// are emitted in the output stream in non-deterministic order.
321    ///
322    /// # Example
323    /// ```rust
324    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
325    /// # use futures::StreamExt;
326    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
327    /// let tick = process.tick();
328    /// let optional = tick.optional_first_tick(q!(
329    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
330    /// ));
331    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
332    /// # }, |mut stream| async move {
333    /// // 1, 2, 3, but in no particular order
334    /// # let mut results = Vec::new();
335    /// # for _ in 0..3 {
336    /// #     results.push(stream.next().await.unwrap());
337    /// # }
338    /// # results.sort();
339    /// # assert_eq!(results, vec![1, 2, 3]);
340    /// # }));
341    /// ```
342    pub fn flat_map_unordered<U, I, F>(
343        self,
344        f: impl IntoQuotedMut<'a, F, L>,
345    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
346    where
347        I: IntoIterator<Item = U>,
348        F: Fn(T) -> I + 'a,
349    {
350        let f = f.splice_fn1_ctx(&self.location).into();
351        Stream::new(
352            self.location.clone(),
353            HydroNode::FlatMap {
354                f,
355                input: Box::new(self.ir_node.into_inner()),
356                metadata: self.location.new_node_metadata::<U>(),
357            },
358        )
359    }
360
361    /// Flattens the optional value into a stream, preserving the order of elements.
362    ///
363    /// If the optional is empty, the output stream is also empty. If the optional contains
364    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
365    /// in the output stream in deterministic order.
366    ///
367    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
368    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
369    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
370    ///
371    /// # Example
372    /// ```rust
373    /// # use hydro_lang::prelude::*;
374    /// # use futures::StreamExt;
375    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376    /// let tick = process.tick();
377    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
378    /// optional.flatten_ordered().all_ticks()
379    /// # }, |mut stream| async move {
380    /// // 1, 2, 3
381    /// # for w in vec![1, 2, 3] {
382    /// #     assert_eq!(stream.next().await.unwrap(), w);
383    /// # }
384    /// # }));
385    /// ```
386    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
387    where
388        T: IntoIterator<Item = U>,
389    {
390        self.flat_map_ordered(q!(|v| v))
391    }
392
393    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
394    /// for the element type `T` to produce items in any order.
395    ///
396    /// If the optional is empty, the output stream is also empty. If the optional contains
397    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
398    /// in the output stream in non-deterministic order.
399    ///
400    /// # Example
401    /// ```rust
402    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
405    /// let tick = process.tick();
406    /// let optional = tick.optional_first_tick(q!(
407    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
408    /// ));
409    /// optional.flatten_unordered().all_ticks()
410    /// # }, |mut stream| async move {
411    /// // 1, 2, 3, but in no particular order
412    /// # let mut results = Vec::new();
413    /// # for _ in 0..3 {
414    /// #     results.push(stream.next().await.unwrap());
415    /// # }
416    /// # results.sort();
417    /// # assert_eq!(results, vec![1, 2, 3]);
418    /// # }));
419    /// ```
420    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
421    where
422        T: IntoIterator<Item = U>,
423    {
424        self.flat_map_unordered(q!(|v| v))
425    }
426
427    /// Creates an optional containing only the value if it satisfies a predicate `f`.
428    ///
429    /// If the optional is empty, the output optional is also empty. If the optional contains
430    /// a value and the predicate returns `true`, the output optional contains the same value.
431    /// If the predicate returns `false`, the output optional is empty.
432    ///
433    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
434    /// not modify or take ownership of the value. If you need to modify the value while filtering
435    /// use [`Optional::filter_map`] instead.
436    ///
437    /// # Example
438    /// ```rust
439    /// # use hydro_lang::prelude::*;
440    /// # use futures::StreamExt;
441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
442    /// let tick = process.tick();
443    /// let optional = tick.optional_first_tick(q!(5));
444    /// optional.filter(q!(|&x| x > 3)).all_ticks()
445    /// # }, |mut stream| async move {
446    /// // 5
447    /// # assert_eq!(stream.next().await.unwrap(), 5);
448    /// # }));
449    /// ```
450    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
451    where
452        F: Fn(&T) -> bool + 'a,
453    {
454        let f = f.splice_fn1_borrow_ctx(&self.location).into();
455        Optional::new(
456            self.location.clone(),
457            HydroNode::Filter {
458                f,
459                input: Box::new(self.ir_node.into_inner()),
460                metadata: self.location.new_node_metadata::<T>(),
461            },
462        )
463    }
464
465    /// An operator that both filters and maps. It yields only the value if the supplied
466    /// closure `f` returns `Some(value)`.
467    ///
468    /// If the optional is empty, the output optional is also empty. If the optional contains
469    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
470    /// If the closure returns `None`, the output optional is empty.
471    ///
472    /// # Example
473    /// ```rust
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477    /// let tick = process.tick();
478    /// let optional = tick.optional_first_tick(q!("42"));
479    /// optional
480    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
481    ///     .all_ticks()
482    /// # }, |mut stream| async move {
483    /// // 42
484    /// # assert_eq!(stream.next().await.unwrap(), 42);
485    /// # }));
486    /// ```
487    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
488    where
489        F: Fn(T) -> Option<U> + 'a,
490    {
491        let f = f.splice_fn1_ctx(&self.location).into();
492        Optional::new(
493            self.location.clone(),
494            HydroNode::FilterMap {
495                f,
496                input: Box::new(self.ir_node.into_inner()),
497                metadata: self.location.new_node_metadata::<U>(),
498            },
499        )
500    }
501
502    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
503    ///
504    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
505    /// non-null. This is useful for combining several pieces of state together.
506    ///
507    /// # Example
508    /// ```rust
509    /// # use hydro_lang::prelude::*;
510    /// # use futures::StreamExt;
511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512    /// let tick = process.tick();
513    /// let numbers = process
514    ///   .source_iter(q!(vec![123, 456, 789]))
515    ///   .batch(&tick, nondet!(/** test */));
516    /// let min = numbers.clone().min(); // Optional
517    /// let max = numbers.max(); // Optional
518    /// min.zip(max).all_ticks()
519    /// # }, |mut stream| async move {
520    /// // [(123, 789)]
521    /// # for w in vec![(123, 789)] {
522    /// #     assert_eq!(stream.next().await.unwrap(), w);
523    /// # }
524    /// # }));
525    /// ```
526    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
527    where
528        O: Clone,
529    {
530        let other: Optional<O, L, B> = other.into();
531        check_matching_location(&self.location, &other.location);
532
533        if L::is_top_level() {
534            Optional::new(
535                self.location.clone(),
536                HydroNode::Persist {
537                    inner: Box::new(HydroNode::CrossSingleton {
538                        left: Box::new(HydroNode::Unpersist {
539                            inner: Box::new(self.ir_node.into_inner()),
540                            metadata: self.location.new_node_metadata::<T>(),
541                        }),
542                        right: Box::new(HydroNode::Unpersist {
543                            inner: Box::new(other.ir_node.into_inner()),
544                            metadata: self.location.new_node_metadata::<O>(),
545                        }),
546                        metadata: self.location.new_node_metadata::<(T, O)>(),
547                    }),
548                    metadata: self.location.new_node_metadata::<(T, O)>(),
549                },
550            )
551        } else {
552            Optional::new(
553                self.location.clone(),
554                HydroNode::CrossSingleton {
555                    left: Box::new(self.ir_node.into_inner()),
556                    right: Box::new(other.ir_node.into_inner()),
557                    metadata: self.location.new_node_metadata::<(T, O)>(),
558                },
559            )
560        }
561    }
562
563    /// Passes through `self` when it has a value, otherwise passes through `other`.
564    ///
565    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
566    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
567    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
568    ///
569    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
570    /// of the inputs change (including to/from null states).
571    ///
572    /// # Example
573    /// ```rust
574    /// # use hydro_lang::prelude::*;
575    /// # use futures::StreamExt;
576    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
577    /// let tick = process.tick();
578    /// // ticks are lazy by default, forces the second tick to run
579    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
580    ///
581    /// let some_first_tick = tick.optional_first_tick(q!(123));
582    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
583    /// some_first_tick.or(some_second_tick).all_ticks()
584    /// # }, |mut stream| async move {
585    /// // [123 /* first tick */, 456 /* second tick */]
586    /// # for w in vec![123, 456] {
587    /// #     assert_eq!(stream.next().await.unwrap(), w);
588    /// # }
589    /// # }));
590    /// ```
591    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
592        check_matching_location(&self.location, &other.location);
593
594        if L::is_top_level() {
595            Optional::new(
596                self.location.clone(),
597                HydroNode::Persist {
598                    inner: Box::new(HydroNode::ChainFirst {
599                        first: Box::new(HydroNode::Unpersist {
600                            inner: Box::new(self.ir_node.into_inner()),
601                            metadata: self.location.new_node_metadata::<T>(),
602                        }),
603                        second: Box::new(HydroNode::Unpersist {
604                            inner: Box::new(other.ir_node.into_inner()),
605                            metadata: self.location.new_node_metadata::<T>(),
606                        }),
607                        metadata: self.location.new_node_metadata::<T>(),
608                    }),
609                    metadata: self.location.new_node_metadata::<T>(),
610                },
611            )
612        } else {
613            Optional::new(
614                self.location.clone(),
615                HydroNode::ChainFirst {
616                    first: Box::new(self.ir_node.into_inner()),
617                    second: Box::new(other.ir_node.into_inner()),
618                    metadata: self.location.new_node_metadata::<T>(),
619                },
620            )
621        }
622    }
623
624    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
625    ///
626    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
627    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
628    ///
629    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
630    /// of the inputs change (including to/from null states).
631    ///
632    /// # Example
633    /// ```rust
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// let tick = process.tick();
638    /// // ticks are lazy by default, forces the later ticks to run
639    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640    ///
641    /// let some_first_tick = tick.optional_first_tick(q!(123));
642    /// some_first_tick
643    ///     .unwrap_or(tick.singleton(q!(456)))
644    ///     .all_ticks()
645    /// # }, |mut stream| async move {
646    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
647    /// # for w in vec![123, 456, 456, 456] {
648    /// #     assert_eq!(stream.next().await.unwrap(), w);
649    /// # }
650    /// # }));
651    /// ```
652    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
653        let res_option = self.or(other.into());
654        Singleton::new(res_option.location, res_option.ir_node.into_inner())
655    }
656
657    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
658    ///
659    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
660    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
661    /// so that Hydro can skip any computation on null values.
662    ///
663    /// # Example
664    /// ```rust
665    /// # use hydro_lang::prelude::*;
666    /// # use futures::StreamExt;
667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668    /// let tick = process.tick();
669    /// // ticks are lazy by default, forces the later ticks to run
670    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671    ///
672    /// let some_first_tick = tick.optional_first_tick(q!(123));
673    /// some_first_tick.into_singleton().all_ticks()
674    /// # }, |mut stream| async move {
675    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
676    /// # for w in vec![Some(123), None, None, None] {
677    /// #     assert_eq!(stream.next().await.unwrap(), w);
678    /// # }
679    /// # }));
680    /// ```
681    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
682    where
683        T: Clone,
684    {
685        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
686        let core_ir = HydroNode::Persist {
687            inner: Box::new(HydroNode::Source {
688                source: HydroSource::Iter(none.into()),
689                metadata: self.location.root().new_node_metadata::<Option<T>>(),
690            }),
691            metadata: self.location.new_node_metadata::<Option<T>>(),
692        };
693
694        let none_singleton = if L::is_top_level() {
695            Singleton::new(
696                self.location.clone(),
697                HydroNode::Persist {
698                    inner: Box::new(core_ir),
699                    metadata: self.location.new_node_metadata::<Option<T>>(),
700                },
701            )
702        } else {
703            Singleton::new(self.location.clone(), core_ir)
704        };
705
706        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
707    }
708
709    /// An operator which allows you to "name" a `HydroNode`.
710    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
711    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
712        {
713            let mut node = self.ir_node.borrow_mut();
714            let metadata = node.metadata_mut();
715            metadata.tag = Some(name.to_string());
716        }
717        self
718    }
719}
720
721impl<'a, T, L> Optional<T, L, Bounded>
722where
723    L: Location<'a>,
724{
725    /// Filters this optional, passing through the optional value if it is non-null **and** the
726    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
727    ///
728    /// Useful for conditionally processing, such as only emitting an optional's value outside
729    /// a tick if some other condition is satisfied.
730    ///
731    /// # Example
732    /// ```rust
733    /// # use hydro_lang::prelude::*;
734    /// # use futures::StreamExt;
735    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
736    /// let tick = process.tick();
737    /// // ticks are lazy by default, forces the second tick to run
738    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
739    ///
740    /// let batch_first_tick = process
741    ///   .source_iter(q!(vec![]))
742    ///   .batch(&tick, nondet!(/** test */));
743    /// let batch_second_tick = process
744    ///   .source_iter(q!(vec![456]))
745    ///   .batch(&tick, nondet!(/** test */))
746    ///   .defer_tick(); // appears on the second tick
747    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
748    /// batch_first_tick.chain(batch_second_tick).first()
749    ///   .filter_if_some(some_on_first_tick)
750    ///   .unwrap_or(tick.singleton(q!(789)))
751    ///   .all_ticks()
752    /// # }, |mut stream| async move {
753    /// // [789, 789]
754    /// # for w in vec![789, 789] {
755    /// #     assert_eq!(stream.next().await.unwrap(), w);
756    /// # }
757    /// # }));
758    /// ```
759    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
760        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
761    }
762
763    /// Filters this optional, passing through the optional value if it is non-null **and** the
764    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
765    ///
766    /// Useful for conditionally processing, such as only emitting an optional's value outside
767    /// a tick if some other condition is satisfied.
768    ///
769    /// # Example
770    /// ```rust
771    /// # use hydro_lang::prelude::*;
772    /// # use futures::StreamExt;
773    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
774    /// let tick = process.tick();
775    /// // ticks are lazy by default, forces the second tick to run
776    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
777    ///
778    /// let batch_first_tick = process
779    ///   .source_iter(q!(vec![]))
780    ///   .batch(&tick, nondet!(/** test */));
781    /// let batch_second_tick = process
782    ///   .source_iter(q!(vec![456]))
783    ///   .batch(&tick, nondet!(/** test */))
784    ///   .defer_tick(); // appears on the second tick
785    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
786    /// batch_first_tick.chain(batch_second_tick).first()
787    ///   .filter_if_none(some_on_first_tick)
788    ///   .unwrap_or(tick.singleton(q!(789)))
789    ///   .all_ticks()
790    /// # }, |mut stream| async move {
791    /// // [789, 789]
792    /// # for w in vec![789, 456] {
793    /// #     assert_eq!(stream.next().await.unwrap(), w);
794    /// # }
795    /// # }));
796    /// ```
797    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
798        self.filter_if_some(
799            other
800                .map(q!(|_| ()))
801                .into_singleton()
802                .filter(q!(|o| o.is_none())),
803        )
804    }
805
806    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
807    ///
808    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
809    /// having a value, such as only releasing a piece of state if the node is the leader.
810    ///
811    /// # Example
812    /// ```rust
813    /// # use hydro_lang::prelude::*;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// // ticks are lazy by default, forces the second tick to run
818    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
819    ///
820    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
821    /// some_on_first_tick
822    ///     .if_some_then(tick.singleton(q!(456)))
823    ///     .unwrap_or(tick.singleton(q!(123)))
824    /// # .all_ticks()
825    /// # }, |mut stream| async move {
826    /// // 456 (first tick) ~> 123 (second tick onwards)
827    /// # for w in vec![456, 123, 123] {
828    /// #     assert_eq!(stream.next().await.unwrap(), w);
829    /// # }
830    /// # }));
831    /// ```
832    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
833        value.filter_if_some(self)
834    }
835}
836
837impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
838where
839    L: Location<'a> + NoTick,
840{
841    /// Returns an optional value corresponding to the latest snapshot of the optional
842    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
843    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
844    /// all snapshots of this optional into the atomic-associated tick will observe the
845    /// same value each tick.
846    ///
847    /// # Non-Determinism
848    /// Because this picks a snapshot of a optional whose value is continuously changing,
849    /// the output optional has a non-deterministic value since the snapshot can be at an
850    /// arbitrary point in time.
851    pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
852        Optional::new(
853            self.location.clone().tick,
854            HydroNode::Unpersist {
855                inner: Box::new(self.ir_node.into_inner()),
856                metadata: self.location.new_node_metadata::<T>(),
857            },
858        )
859    }
860
861    /// Returns this optional back into a top-level, asynchronous execution context where updates
862    /// to the value will be asynchronously propagated.
863    pub fn end_atomic(self) -> Optional<T, L, B> {
864        Optional::new(self.location.tick.l, self.ir_node.into_inner())
865    }
866}
867
868impl<'a, T, L, B: Boundedness> Optional<T, L, B>
869where
870    L: Location<'a> + NoTick + NoAtomic,
871{
872    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
873    /// will observe the same version of the value and will be executed synchronously before any
874    /// outputs are yielded (in [`Optional::end_atomic`]).
875    ///
876    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
877    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
878    /// a different version).
879    ///
880    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
881    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
882    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
883    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
884        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
885    }
886
887    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
888    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
889    /// relevant data that contributed to the snapshot at tick `t`.
890    ///
891    /// # Non-Determinism
892    /// Because this picks a snapshot of a optional whose value is continuously changing,
893    /// the output optional has a non-deterministic value since the snapshot can be at an
894    /// arbitrary point in time.
895    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
896        self.atomic(tick).snapshot(nondet)
897    }
898
899    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
900    /// with order corresponding to increasing prefixes of data contributing to the optional.
901    ///
902    /// # Non-Determinism
903    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
904    /// to non-deterministic batching and arrival of inputs, the output stream is
905    /// non-deterministic.
906    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
907        let tick = self.location.tick();
908        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
909    }
910
911    /// Given a time interval, returns a stream corresponding to snapshots of the optional
912    /// value taken at various points in time. Because the input optional may be
913    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
914    /// represent the value of the optional given some prefix of the streams leading up to
915    /// it.
916    ///
917    /// # Non-Determinism
918    /// The output stream is non-deterministic in which elements are sampled, since this
919    /// is controlled by a clock.
920    pub fn sample_every(
921        self,
922        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
923        nondet: NonDet,
924    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
925        let samples = self.location.source_interval(interval, nondet);
926        let tick = self.location.tick();
927
928        self.snapshot(&tick, nondet)
929            .filter_if_some(samples.batch(&tick, nondet).first())
930            .all_ticks()
931            .weakest_retries()
932    }
933}
934
935impl<'a, T, L> Optional<T, Tick<L>, Bounded>
936where
937    L: Location<'a>,
938{
939    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
940    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
941    /// null values).
942    ///
943    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
944    /// producing one element in the output for each (non-null) tick. This is useful for batched
945    /// computations, where the results from each tick must be combined together.
946    ///
947    /// # Example
948    /// ```rust
949    /// # use hydro_lang::prelude::*;
950    /// # use futures::StreamExt;
951    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
952    /// # let tick = process.tick();
953    /// # // ticks are lazy by default, forces the second tick to run
954    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
955    /// # let batch_first_tick = process
956    /// #   .source_iter(q!(vec![]))
957    /// #   .batch(&tick, nondet!(/** test */));
958    /// # let batch_second_tick = process
959    /// #   .source_iter(q!(vec![1, 2, 3]))
960    /// #   .batch(&tick, nondet!(/** test */))
961    /// #   .defer_tick(); // appears on the second tick
962    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
963    /// input_batch // first tick: [], second tick: [1, 2, 3]
964    ///     .max()
965    ///     .all_ticks()
966    /// # }, |mut stream| async move {
967    /// // [3]
968    /// # for w in vec![3] {
969    /// #     assert_eq!(stream.next().await.unwrap(), w);
970    /// # }
971    /// # }));
972    /// ```
973    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
974        self.into_stream().all_ticks()
975    }
976
977    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
978    /// which will stream the value computed in _each_ tick as a separate stream element.
979    ///
980    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
981    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
982    /// optional's [`Tick`] context.
983    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
984        self.into_stream().all_ticks_atomic()
985    }
986
987    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
988    /// be asynchronously updated with the latest value of the optional inside the tick, including
989    /// whether the optional is null or not.
990    ///
991    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
992    /// tick that tracks the inner value. This is useful for getting the value as of the
993    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
994    ///
995    /// # Example
996    /// ```rust
997    /// # use hydro_lang::prelude::*;
998    /// # use futures::StreamExt;
999    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000    /// # let tick = process.tick();
1001    /// # // ticks are lazy by default, forces the second tick to run
1002    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1003    /// # let batch_first_tick = process
1004    /// #   .source_iter(q!(vec![]))
1005    /// #   .batch(&tick, nondet!(/** test */));
1006    /// # let batch_second_tick = process
1007    /// #   .source_iter(q!(vec![1, 2, 3]))
1008    /// #   .batch(&tick, nondet!(/** test */))
1009    /// #   .defer_tick(); // appears on the second tick
1010    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1011    /// input_batch // first tick: [], second tick: [1, 2, 3]
1012    ///     .max()
1013    ///     .latest()
1014    /// # .into_singleton()
1015    /// # .sample_eager(nondet!(/** test */))
1016    /// # }, |mut stream| async move {
1017    /// // asynchronously changes from None ~> 3
1018    /// # for w in vec![None, Some(3)] {
1019    /// #     assert_eq!(stream.next().await.unwrap(), w);
1020    /// # }
1021    /// # }));
1022    /// ```
1023    pub fn latest(self) -> Optional<T, L, Unbounded> {
1024        Optional::new(
1025            self.location.outer().clone(),
1026            HydroNode::Persist {
1027                inner: Box::new(self.ir_node.into_inner()),
1028                metadata: self.location.new_node_metadata::<T>(),
1029            },
1030        )
1031    }
1032
1033    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1034    /// be updated with the latest value of the optional inside the tick.
1035    ///
1036    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1037    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1038    /// optional's [`Tick`] context.
1039    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1040        Optional::new(
1041            Atomic {
1042                tick: self.location.clone(),
1043            },
1044            HydroNode::Persist {
1045                inner: Box::new(self.ir_node.into_inner()),
1046                metadata: self.location.new_node_metadata::<T>(),
1047            },
1048        )
1049    }
1050
1051    #[expect(missing_docs, reason = "TODO")]
1052    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1053        Optional::new(
1054            self.location.clone(),
1055            HydroNode::DeferTick {
1056                input: Box::new(self.ir_node.into_inner()),
1057                metadata: self.location.new_node_metadata::<T>(),
1058            },
1059        )
1060    }
1061
1062    #[deprecated(note = "use .into_stream().persist()")]
1063    #[expect(missing_docs, reason = "deprecated")]
1064    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1065        Stream::new(
1066            self.location.clone(),
1067            HydroNode::Persist {
1068                inner: Box::new(self.ir_node.into_inner()),
1069                metadata: self.location.new_node_metadata::<T>(),
1070            },
1071        )
1072    }
1073
1074    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1075    /// non-null. Otherwise, the stream is empty.
1076    ///
1077    /// # Example
1078    /// ```rust
1079    /// # use hydro_lang::prelude::*;
1080    /// # use futures::StreamExt;
1081    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1082    /// # let tick = process.tick();
1083    /// # // ticks are lazy by default, forces the second tick to run
1084    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1085    /// # let batch_first_tick = process
1086    /// #   .source_iter(q!(vec![]))
1087    /// #   .batch(&tick, nondet!(/** test */));
1088    /// # let batch_second_tick = process
1089    /// #   .source_iter(q!(vec![123, 456]))
1090    /// #   .batch(&tick, nondet!(/** test */))
1091    /// #   .defer_tick(); // appears on the second tick
1092    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1093    /// input_batch // first tick: [], second tick: [123, 456]
1094    ///     .clone()
1095    ///     .max()
1096    ///     .into_stream()
1097    ///     .chain(input_batch)
1098    ///     .all_ticks()
1099    /// # }, |mut stream| async move {
1100    /// // [456, 123, 456]
1101    /// # for w in vec![456, 123, 456] {
1102    /// #     assert_eq!(stream.next().await.unwrap(), w);
1103    /// # }
1104    /// # }));
1105    /// ```
1106    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1107        Stream::new(self.location, self.ir_node.into_inner())
1108    }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use futures::StreamExt;
1114    use hydro_deploy::Deployment;
1115    use stageleft::q;
1116
1117    use super::Optional;
1118    use crate::compile::builder::FlowBuilder;
1119    use crate::location::Location;
1120
1121    #[tokio::test]
1122    async fn optional_or_cardinality() {
1123        let mut deployment = Deployment::new();
1124
1125        let flow = FlowBuilder::new();
1126        let node = flow.process::<()>();
1127        let external = flow.external::<()>();
1128
1129        let node_tick = node.tick();
1130        let tick_singleton = node_tick.singleton(q!(123));
1131        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1132        let counts = tick_optional_inhabited
1133            .clone()
1134            .or(tick_optional_inhabited)
1135            .into_stream()
1136            .count()
1137            .all_ticks()
1138            .send_bincode_external(&external);
1139
1140        let nodes = flow
1141            .with_process(&node, deployment.Localhost())
1142            .with_external(&external, deployment.Localhost())
1143            .deploy(&mut deployment);
1144
1145        deployment.deploy().await.unwrap();
1146
1147        let mut external_out = nodes.connect_source_bincode(counts).await;
1148
1149        deployment.start().await.unwrap();
1150
1151        assert_eq!(external_out.next().await.unwrap(), 1);
1152    }
1153}