Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick};
23use crate::location::{Location, Tick, TopLevel, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26use crate::properties::{StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor};
27
28/// A *nullable* Rust value that can asynchronously change over time.
29///
30/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
31/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
32/// asynchronously change over time, including becoming present of uninhabited.
33///
34/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
35/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
36///
37/// Type Parameters:
38/// - `Type`: the type of the value in this optional (when it is not null)
39/// - `Loc`: the [`Location`] where the optional is materialized
40/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
41pub struct Optional<Type, Loc, Bound: Boundedness> {
42    pub(crate) location: Loc,
43    pub(crate) ir_node: RefCell<HydroNode>,
44    pub(crate) flow_state: FlowState,
45
46    _phantom: PhantomData<(Type, Loc, Bound)>,
47}
48
49impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
50    fn drop(&mut self) {
51        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
52        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
53            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
54                input: Box::new(ir_node),
55                op_metadata: HydroIrOpMetadata::new(),
56            });
57        }
58    }
59}
60
61impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
62where
63    T: Clone,
64    L: Location<'a>,
65{
66    fn from(value: Optional<T, L, Bounded>) -> Self {
67        let tick = value.location().tick();
68        value.clone_into_tick(&tick).latest()
69    }
70}
71
72impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
73where
74    L: Location<'a>,
75{
76    fn defer_tick(self) -> Self {
77        Optional::defer_tick(self)
78    }
79}
80
81impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
82where
83    L: Location<'a>,
84{
85    type Location = Tick<L>;
86
87    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
88        Optional::new(
89            location.clone(),
90            HydroNode::CycleSource {
91                cycle_id,
92                metadata: location.new_node_metadata(Self::collection_kind()),
93            },
94        )
95    }
96}
97
98impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
99where
100    L: Location<'a>,
101{
102    type Location = Tick<L>;
103
104    fn location(&self) -> &Self::Location {
105        self.location()
106    }
107
108    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
109        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
110            location.clone(),
111            HydroNode::DeferTick {
112                input: Box::new(HydroNode::CycleSource {
113                    cycle_id,
114                    metadata: location.new_node_metadata(Self::collection_kind()),
115                }),
116                metadata: location
117                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
118            },
119        );
120
121        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
122    }
123}
124
125impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
126where
127    L: Location<'a>,
128{
129    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
130        assert_eq!(
131            Location::id(&self.location),
132            expected_location,
133            "locations do not match"
134        );
135        self.location
136            .flow_state()
137            .borrow_mut()
138            .push_root(HydroRoot::CycleSink {
139                cycle_id,
140                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
141                op_metadata: HydroIrOpMetadata::new(),
142            });
143    }
144}
145
146impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
147where
148    L: Location<'a>,
149{
150    type Location = L;
151
152    fn create_source(cycle_id: CycleId, location: L) -> Self {
153        Optional::new(
154            location.clone(),
155            HydroNode::CycleSource {
156                cycle_id,
157                metadata: location.new_node_metadata(Self::collection_kind()),
158            },
159        )
160    }
161}
162
163impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
164where
165    L: Location<'a>,
166{
167    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
168        assert_eq!(
169            Location::id(&self.location),
170            expected_location,
171            "locations do not match"
172        );
173        self.location
174            .flow_state()
175            .borrow_mut()
176            .push_root(HydroRoot::CycleSink {
177                cycle_id,
178                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
179                op_metadata: HydroIrOpMetadata::new(),
180            });
181    }
182}
183
184impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
185where
186    L: Location<'a>,
187{
188    fn from(singleton: Singleton<T, L, B>) -> Self {
189        Optional::new(
190            singleton.location.clone(),
191            HydroNode::Cast {
192                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
193                metadata: singleton
194                    .location
195                    .new_node_metadata(Self::collection_kind()),
196            },
197        )
198    }
199}
200
201#[cfg(stageleft_runtime)]
202pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
203    me: Optional<T, L, B>,
204    other: Optional<O, L, B>,
205) -> Optional<(T, O), L, B> {
206    check_matching_location(&me.location, &other.location);
207
208    Optional::new(
209        me.location.clone(),
210        HydroNode::CrossSingleton {
211            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
212            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
213            metadata: me
214                .location
215                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
216        },
217    )
218}
219
220#[cfg(stageleft_runtime)]
221fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
222    me: Optional<T, L, B>,
223    other: Optional<T, L, B>,
224) -> Optional<T, L, B> {
225    check_matching_location(&me.location, &other.location);
226
227    Optional::new(
228        me.location.clone(),
229        HydroNode::ChainFirst {
230            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
231            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
232            metadata: me
233                .location
234                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
235        },
236    )
237}
238
239impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
240where
241    T: Clone,
242    L: Location<'a>,
243{
244    fn clone(&self) -> Self {
245        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
246            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
247            *self.ir_node.borrow_mut() = HydroNode::Tee {
248                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
249                metadata: self.location.new_node_metadata(Self::collection_kind()),
250            };
251        }
252
253        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
254            Optional {
255                location: self.location.clone(),
256                flow_state: self.flow_state.clone(),
257                ir_node: HydroNode::Tee {
258                    inner: SharedNode(inner.0.clone()),
259                    metadata: metadata.clone(),
260                }
261                .into(),
262                _phantom: PhantomData,
263            }
264        } else {
265            unreachable!()
266        }
267    }
268}
269
270impl<'a, T, L, B: Boundedness> Optional<T, L, B>
271where
272    L: Location<'a>,
273{
274    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
275        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
276        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
277        let flow_state = location.flow_state().clone();
278        Optional {
279            location,
280            flow_state,
281            ir_node: RefCell::new(ir_node),
282            _phantom: PhantomData,
283        }
284    }
285
286    pub(crate) fn collection_kind() -> CollectionKind {
287        CollectionKind::Optional {
288            bound: B::BOUND_KIND,
289            element_type: stageleft::quote_type::<T>().into(),
290        }
291    }
292
293    /// Returns the [`Location`] where this optional is being materialized.
294    pub fn location(&self) -> &L {
295        &self.location
296    }
297
298    /// Creates a shared reference handle to this optional that can be captured inside `q!()`
299    /// closures. The handle resolves to `&Option<T>` at runtime.
300    ///
301    /// The optional must be bounded, otherwise reading it would be non-deterministic.
302    pub fn by_ref(&self) -> crate::handoff_ref::OptionalRef<'a, '_, T, L>
303    where
304        B: IsBounded,
305    {
306        crate::handoff_ref::OptionalRef::new(&self.ir_node)
307    }
308
309    /// Returns a mutable reference handle to this optional that can be captured inside `q!()`
310    /// closures. The handle resolves to `&mut Option<T>` at runtime.
311    pub fn by_mut(&self) -> crate::handoff_ref::OptionalMut<'a, '_, T, L>
312    where
313        B: IsBounded,
314    {
315        crate::handoff_ref::OptionalMut::new(&self.ir_node)
316    }
317
318    /// Weakens the consistency of this live collection to not guarantee any consistency across
319    /// cluster members (if this collection is on a cluster).
320    pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
321    where
322        L: Location<'a>,
323    {
324        if L::consistency()
325            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
326        {
327            // already no consistency
328            Optional::new(
329                self.location.drop_consistency(),
330                self.ir_node.replace(HydroNode::Placeholder),
331            )
332        } else {
333            Optional::new(
334                self.location.drop_consistency(),
335                HydroNode::Cast {
336                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337                    metadata: self
338                        .location
339                        .clone()
340                        .drop_consistency()
341                        .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
342                },
343            )
344        }
345    }
346
347    /// Casts this live collection to have the consistency guarantees specified in the given
348    /// location type parameter. The developer must ensure that the strengthened consistency
349    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
350    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
351        self,
352        _proof: impl crate::properties::ConsistencyProof,
353    ) -> Optional<T, L2, B>
354    where
355        L: Location<'a>,
356    {
357        if L::consistency() == L2::consistency() {
358            Optional::new(
359                self.location.with_consistency_of(),
360                self.ir_node.replace(HydroNode::Placeholder),
361            )
362        } else {
363            Optional::new(
364                self.location.with_consistency_of(),
365                HydroNode::AssertIsConsistent {
366                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367                    trusted: false,
368                    metadata: self
369                        .location
370                        .clone()
371                        .with_consistency_of::<L2>()
372                        .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
373                },
374            )
375        }
376    }
377
378    /// Transforms the optional value by applying a function `f` to it,
379    /// continuously as the input is updated.
380    ///
381    /// Whenever the optional is empty, the output optional is also empty.
382    ///
383    /// # Example
384    /// ```rust
385    /// # #[cfg(feature = "deploy")] {
386    /// # use hydro_lang::prelude::*;
387    /// # use futures::StreamExt;
388    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
389    /// let tick = process.tick();
390    /// let optional = tick.optional_first_tick(q!(1));
391    /// optional.map(q!(|v| v + 1)).all_ticks()
392    /// # }, |mut stream| async move {
393    /// // 2
394    /// # assert_eq!(stream.next().await.unwrap(), 2);
395    /// # }));
396    /// # }
397    /// ```
398    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
399    where
400        F: Fn(T) -> U + 'a,
401    {
402        let f = f.splice_fn1_ctx(&self.location).into();
403        Optional::new(
404            self.location.clone(),
405            HydroNode::Map {
406                f,
407                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                metadata: self
409                    .location
410                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
411            },
412        )
413    }
414
415    /// Transforms the optional value by applying a function `f` to it and then flattening
416    /// the result into a stream, preserving the order of elements.
417    ///
418    /// If the optional is empty, the output stream is also empty. If the optional contains
419    /// a value, `f` is applied to produce an iterator, and all items from that iterator
420    /// are emitted in the output stream in deterministic order.
421    ///
422    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
423    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
424    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
425    ///
426    /// # Example
427    /// ```rust
428    /// # #[cfg(feature = "deploy")] {
429    /// # use hydro_lang::prelude::*;
430    /// # use futures::StreamExt;
431    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
432    /// let tick = process.tick();
433    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
434    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
435    /// # }, |mut stream| async move {
436    /// // 1, 2, 3
437    /// # for w in vec![1, 2, 3] {
438    /// #     assert_eq!(stream.next().await.unwrap(), w);
439    /// # }
440    /// # }));
441    /// # }
442    /// ```
443    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
444        self,
445        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
446    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
447    where
448        B: IsBounded,
449        I: IntoIterator<Item = U>,
450        F: FnMut(T) -> I + 'a,
451        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
452        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
453    {
454        self.into_stream().flat_map_ordered(f)
455    }
456
457    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
458    /// for the output type `I` to produce items in any order.
459    ///
460    /// If the optional is empty, the output stream is also empty. If the optional contains
461    /// a value, `f` is applied to produce an iterator, and all items from that iterator
462    /// are emitted in the output stream in non-deterministic order.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
470    /// let tick = process.tick();
471    /// let optional = tick.optional_first_tick(q!(
472    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473    /// ));
474    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
475    /// # }, |mut stream| async move {
476    /// // 1, 2, 3, but in no particular order
477    /// # let mut results = Vec::new();
478    /// # for _ in 0..3 {
479    /// #     results.push(stream.next().await.unwrap());
480    /// # }
481    /// # results.sort();
482    /// # assert_eq!(results, vec![1, 2, 3]);
483    /// # }));
484    /// # }
485    /// ```
486    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
487        self,
488        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
489    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
490    where
491        B: IsBounded,
492        I: IntoIterator<Item = U>,
493        F: FnMut(T) -> I + 'a,
494        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
495        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
496    {
497        self.into_stream().flat_map_unordered(f)
498    }
499
500    /// Flattens the optional value into a stream, preserving the order of elements.
501    ///
502    /// If the optional is empty, the output stream is also empty. If the optional contains
503    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
504    /// in the output stream in deterministic order.
505    ///
506    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
507    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
508    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
509    ///
510    /// # Example
511    /// ```rust
512    /// # #[cfg(feature = "deploy")] {
513    /// # use hydro_lang::prelude::*;
514    /// # use futures::StreamExt;
515    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
516    /// let tick = process.tick();
517    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
518    /// optional.flatten_ordered().all_ticks()
519    /// # }, |mut stream| async move {
520    /// // 1, 2, 3
521    /// # for w in vec![1, 2, 3] {
522    /// #     assert_eq!(stream.next().await.unwrap(), w);
523    /// # }
524    /// # }));
525    /// # }
526    /// ```
527    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
528    where
529        B: IsBounded,
530        T: IntoIterator<Item = U>,
531    {
532        self.flat_map_ordered(q!(|v| v))
533    }
534
535    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
536    /// for the element type `T` to produce items in any order.
537    ///
538    /// If the optional is empty, the output stream is also empty. If the optional contains
539    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
540    /// in the output stream in non-deterministic order.
541    ///
542    /// # Example
543    /// ```rust
544    /// # #[cfg(feature = "deploy")] {
545    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
546    /// # use futures::StreamExt;
547    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
548    /// let tick = process.tick();
549    /// let optional = tick.optional_first_tick(q!(
550    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
551    /// ));
552    /// optional.flatten_unordered().all_ticks()
553    /// # }, |mut stream| async move {
554    /// // 1, 2, 3, but in no particular order
555    /// # let mut results = Vec::new();
556    /// # for _ in 0..3 {
557    /// #     results.push(stream.next().await.unwrap());
558    /// # }
559    /// # results.sort();
560    /// # assert_eq!(results, vec![1, 2, 3]);
561    /// # }));
562    /// # }
563    /// ```
564    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
565    where
566        B: IsBounded,
567        T: IntoIterator<Item = U>,
568    {
569        self.flat_map_unordered(q!(|v| v))
570    }
571
572    /// Creates an optional containing only the value if it satisfies a predicate `f`.
573    ///
574    /// If the optional is empty, the output optional is also empty. If the optional contains
575    /// a value and the predicate returns `true`, the output optional contains the same value.
576    /// If the predicate returns `false`, the output optional is empty.
577    ///
578    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
579    /// not modify or take ownership of the value. If you need to modify the value while filtering
580    /// use [`Optional::filter_map`] instead.
581    ///
582    /// # Example
583    /// ```rust
584    /// # #[cfg(feature = "deploy")] {
585    /// # use hydro_lang::prelude::*;
586    /// # use futures::StreamExt;
587    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
588    /// let tick = process.tick();
589    /// let optional = tick.optional_first_tick(q!(5));
590    /// optional.filter(q!(|&x| x > 3)).all_ticks()
591    /// # }, |mut stream| async move {
592    /// // 5
593    /// # assert_eq!(stream.next().await.unwrap(), 5);
594    /// # }));
595    /// # }
596    /// ```
597    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
598    where
599        F: Fn(&T) -> bool + 'a,
600    {
601        let f = f.splice_fn1_borrow_ctx(&self.location).into();
602        Optional::new(
603            self.location.clone(),
604            HydroNode::Filter {
605                f,
606                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
607                metadata: self.location.new_node_metadata(Self::collection_kind()),
608            },
609        )
610    }
611
612    /// An operator that both filters and maps. It yields only the value if the supplied
613    /// closure `f` returns `Some(value)`.
614    ///
615    /// If the optional is empty, the output optional is also empty. If the optional contains
616    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
617    /// If the closure returns `None`, the output optional is empty.
618    ///
619    /// # Example
620    /// ```rust
621    /// # #[cfg(feature = "deploy")] {
622    /// # use hydro_lang::prelude::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625    /// let tick = process.tick();
626    /// let optional = tick.optional_first_tick(q!("42"));
627    /// optional
628    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
629    ///     .all_ticks()
630    /// # }, |mut stream| async move {
631    /// // 42
632    /// # assert_eq!(stream.next().await.unwrap(), 42);
633    /// # }));
634    /// # }
635    /// ```
636    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
637    where
638        F: Fn(T) -> Option<U> + 'a,
639    {
640        let f = f.splice_fn1_ctx(&self.location).into();
641        Optional::new(
642            self.location.clone(),
643            HydroNode::FilterMap {
644                f,
645                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
646                metadata: self
647                    .location
648                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
649            },
650        )
651    }
652
653    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
654    ///
655    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
656    /// non-null. This is useful for combining several pieces of state together.
657    ///
658    /// # Example
659    /// ```rust
660    /// # #[cfg(feature = "deploy")] {
661    /// # use hydro_lang::prelude::*;
662    /// # use futures::StreamExt;
663    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
664    /// let tick = process.tick();
665    /// let numbers = process
666    ///   .source_iter(q!(vec![123, 456, 789]))
667    ///   .batch(&tick, nondet!(/** test */));
668    /// let min = numbers.clone().min(); // Optional
669    /// let max = numbers.max(); // Optional
670    /// min.zip(max).all_ticks()
671    /// # }, |mut stream| async move {
672    /// // [(123, 789)]
673    /// # for w in vec![(123, 789)] {
674    /// #     assert_eq!(stream.next().await.unwrap(), w);
675    /// # }
676    /// # }));
677    /// # }
678    /// ```
679    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
680    where
681        B: IsBounded,
682    {
683        let other: Optional<O, L, B> = other.into();
684        check_matching_location(&self.location, &other.location);
685
686        if L::is_top_level()
687            && let Some(tick) = self.location.try_tick()
688        {
689            let self_location = self.location().clone();
690            let out = zip_inside_tick(
691                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693            )
694            .latest();
695
696            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
697        } else {
698            zip_inside_tick(self, other)
699        }
700    }
701
702    /// Passes through `self` when it has a value, otherwise passes through `other`.
703    ///
704    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
705    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
706    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
707    ///
708    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
709    /// of the inputs change (including to/from null states).
710    ///
711    /// # Example
712    /// ```rust
713    /// # #[cfg(feature = "deploy")] {
714    /// # use hydro_lang::prelude::*;
715    /// # use futures::StreamExt;
716    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717    /// let tick = process.tick();
718    /// // ticks are lazy by default, forces the second tick to run
719    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
720    ///
721    /// let some_first_tick = tick.optional_first_tick(q!(123));
722    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
723    /// some_first_tick.or(some_second_tick).all_ticks()
724    /// # }, |mut stream| async move {
725    /// // [123 /* first tick */, 456 /* second tick */]
726    /// # for w in vec![123, 456] {
727    /// #     assert_eq!(stream.next().await.unwrap(), w);
728    /// # }
729    /// # }));
730    /// # }
731    /// ```
732    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
733        check_matching_location(&self.location, &other.location);
734
735        if L::is_top_level()
736            && !B::BOUNDED // only if unbounded we need to use a tick
737            && let Some(tick) = self.location.try_tick()
738        {
739            let self_location = self.location().clone();
740            let out = or_inside_tick(
741                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
742                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
743            )
744            .latest();
745
746            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
747        } else {
748            Optional::new(
749                self.location.clone(),
750                HydroNode::ChainFirst {
751                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
752                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
753                    metadata: self.location.new_node_metadata(Self::collection_kind()),
754                },
755            )
756        }
757    }
758
759    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
760    ///
761    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
762    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
763    ///
764    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
765    /// of the inputs change (including to/from null states).
766    ///
767    /// # Example
768    /// ```rust
769    /// # #[cfg(feature = "deploy")] {
770    /// # use hydro_lang::prelude::*;
771    /// # use futures::StreamExt;
772    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773    /// let tick = process.tick();
774    /// // ticks are lazy by default, forces the later ticks to run
775    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
776    ///
777    /// let some_first_tick = tick.optional_first_tick(q!(123));
778    /// some_first_tick
779    ///     .unwrap_or(tick.singleton(q!(456)))
780    ///     .all_ticks()
781    /// # }, |mut stream| async move {
782    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
783    /// # for w in vec![123, 456, 456, 456] {
784    /// #     assert_eq!(stream.next().await.unwrap(), w);
785    /// # }
786    /// # }));
787    /// # }
788    /// ```
789    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
790        let res_option = self.or(other.into());
791        Singleton::new(
792            res_option.location.clone(),
793            HydroNode::Cast {
794                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
795                metadata: res_option
796                    .location
797                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
798            },
799        )
800    }
801
802    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
803    ///
804    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
805    /// [`Optional`] when the default value of the type is a suitable fallback.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// let tick = process.tick();
814    /// // ticks are lazy by default, forces the later ticks to run
815    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
816    ///
817    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
818    /// some_first_tick.unwrap_or_default().all_ticks()
819    /// # }, |mut stream| async move {
820    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
821    /// # for w in vec![123, 0, 0, 0] {
822    /// #     assert_eq!(stream.next().await.unwrap(), w);
823    /// # }
824    /// # }));
825    /// # }
826    /// ```
827    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
828    where
829        T: Default + Clone,
830    {
831        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
832    }
833
834    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
835    ///
836    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
837    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
838    /// so that Hydro can skip any computation on null values.
839    ///
840    /// # Example
841    /// ```rust
842    /// # #[cfg(feature = "deploy")] {
843    /// # use hydro_lang::prelude::*;
844    /// # use futures::StreamExt;
845    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
846    /// let tick = process.tick();
847    /// // ticks are lazy by default, forces the later ticks to run
848    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
849    ///
850    /// let some_first_tick = tick.optional_first_tick(q!(123));
851    /// some_first_tick.into_singleton().all_ticks()
852    /// # }, |mut stream| async move {
853    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
854    /// # for w in vec![Some(123), None, None, None] {
855    /// #     assert_eq!(stream.next().await.unwrap(), w);
856    /// # }
857    /// # }));
858    /// # }
859    /// ```
860    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
861    where
862        T: Clone,
863    {
864        let none: syn::Expr = parse_quote!(::std::option::Option::None);
865
866        let none_singleton = Singleton::new(
867            self.location.clone(),
868            HydroNode::SingletonSource {
869                value: none.into(),
870                first_tick_only: false,
871                metadata: self
872                    .location
873                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
874            },
875        );
876
877        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
878    }
879
880    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
881    ///
882    /// # Example
883    /// ```rust
884    /// # #[cfg(feature = "deploy")] {
885    /// # use hydro_lang::prelude::*;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888    /// let tick = process.tick();
889    /// // ticks are lazy by default, forces the second tick to run
890    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
891    ///
892    /// let some_first_tick = tick.optional_first_tick(q!(42));
893    /// some_first_tick.is_some().all_ticks()
894    /// # }, |mut stream| async move {
895    /// // [true /* first tick */, false /* second tick */, ...]
896    /// # for w in vec![true, false] {
897    /// #     assert_eq!(stream.next().await.unwrap(), w);
898    /// # }
899    /// # }));
900    /// # }
901    /// ```
902    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
903    pub fn is_some(self) -> Singleton<bool, L, B> {
904        self.map(q!(|_| ()))
905            .into_singleton()
906            .map(q!(|o| o.is_some()))
907    }
908
909    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
910    ///
911    /// # Example
912    /// ```rust
913    /// # #[cfg(feature = "deploy")] {
914    /// # use hydro_lang::prelude::*;
915    /// # use futures::StreamExt;
916    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
917    /// let tick = process.tick();
918    /// // ticks are lazy by default, forces the second tick to run
919    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
920    ///
921    /// let some_first_tick = tick.optional_first_tick(q!(42));
922    /// some_first_tick.is_none().all_ticks()
923    /// # }, |mut stream| async move {
924    /// // [false /* first tick */, true /* second tick */, ...]
925    /// # for w in vec![false, true] {
926    /// #     assert_eq!(stream.next().await.unwrap(), w);
927    /// # }
928    /// # }));
929    /// # }
930    /// ```
931    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
932    pub fn is_none(self) -> Singleton<bool, L, B> {
933        self.map(q!(|_| ()))
934            .into_singleton()
935            .map(q!(|o| o.is_none()))
936    }
937
938    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
939    /// values are equal, `false` otherwise (including when either is null).
940    ///
941    /// # Example
942    /// ```rust
943    /// # #[cfg(feature = "deploy")] {
944    /// # use hydro_lang::prelude::*;
945    /// # use futures::StreamExt;
946    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
947    /// let tick = process.tick();
948    /// // ticks are lazy by default, forces the second tick to run
949    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
950    ///
951    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
952    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
953    /// a.is_some_and_equals(b).all_ticks()
954    /// # }, |mut stream| async move {
955    /// // [true, false]
956    /// # for w in vec![true, false] {
957    /// #     assert_eq!(stream.next().await.unwrap(), w);
958    /// # }
959    /// # }));
960    /// # }
961    /// ```
962    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
963    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
964    where
965        T: PartialEq + Clone,
966        B: IsBounded,
967    {
968        self.into_singleton()
969            .zip(other.into_singleton())
970            .map(q!(|(a, b)| a.is_some() && a == b))
971    }
972
973    /// An operator which allows you to "name" a `HydroNode`.
974    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
975    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
976        {
977            let mut node = self.ir_node.borrow_mut();
978            let metadata = node.metadata_mut();
979            metadata.tag = Some(name.to_owned());
980        }
981        self
982    }
983
984    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
985    /// implies that `B == Bounded`.
986    pub fn make_bounded(self) -> Optional<T, L, Bounded>
987    where
988        B: IsBounded,
989    {
990        Optional::new(
991            self.location.clone(),
992            self.ir_node.replace(HydroNode::Placeholder),
993        )
994    }
995
996    /// Clones this bounded optional into a tick, returning a optional that has the
997    /// same value as the outer optional. Because the outer optional is bounded, this
998    /// is deterministic because there is only a single immutable version.
999    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
1000    where
1001        B: IsBounded,
1002        T: Clone,
1003    {
1004        // TODO(shadaj): avoid printing simulator logs for this snapshot
1005        let inner = self.snapshot(
1006            tick,
1007            nondet!(/** bounded top-level optional so deterministic */),
1008        );
1009        Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1010    }
1011
1012    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1013    /// non-null. Otherwise, the stream is empty.
1014    ///
1015    /// # Example
1016    /// ```rust
1017    /// # #[cfg(feature = "deploy")] {
1018    /// # use hydro_lang::prelude::*;
1019    /// # use futures::StreamExt;
1020    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1021    /// # let tick = process.tick();
1022    /// # // ticks are lazy by default, forces the second tick to run
1023    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1024    /// # let batch_first_tick = process
1025    /// #   .source_iter(q!(vec![]))
1026    /// #   .batch(&tick, nondet!(/** test */));
1027    /// # let batch_second_tick = process
1028    /// #   .source_iter(q!(vec![123, 456]))
1029    /// #   .batch(&tick, nondet!(/** test */))
1030    /// #   .defer_tick(); // appears on the second tick
1031    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1032    /// input_batch // first tick: [], second tick: [123, 456]
1033    ///     .clone()
1034    ///     .max()
1035    ///     .into_stream()
1036    ///     .chain(input_batch)
1037    ///     .all_ticks()
1038    /// # }, |mut stream| async move {
1039    /// // [456, 123, 456]
1040    /// # for w in vec![456, 123, 456] {
1041    /// #     assert_eq!(stream.next().await.unwrap(), w);
1042    /// # }
1043    /// # }));
1044    /// # }
1045    /// ```
1046    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1047    where
1048        B: IsBounded,
1049    {
1050        Stream::new(
1051            self.location.clone(),
1052            HydroNode::Cast {
1053                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1054                metadata: self.location.new_node_metadata(Stream::<
1055                    T,
1056                    Tick<L>,
1057                    Bounded,
1058                    TotalOrder,
1059                    ExactlyOnce,
1060                >::collection_kind()),
1061            },
1062        )
1063    }
1064
1065    /// Filters this optional, passing through the value if the boolean signal is `true`,
1066    /// otherwise the output is null.
1067    ///
1068    /// # Example
1069    /// ```rust
1070    /// # #[cfg(feature = "deploy")] {
1071    /// # use hydro_lang::prelude::*;
1072    /// # use futures::StreamExt;
1073    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1074    /// let tick = process.tick();
1075    /// // ticks are lazy by default, forces the second tick to run
1076    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1077    ///
1078    /// let some_first_tick = tick.optional_first_tick(q!(()));
1079    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1080    /// let batch_first_tick = process
1081    ///   .source_iter(q!(vec![456]))
1082    ///   .batch(&tick, nondet!(/** test */));
1083    /// let batch_second_tick = process
1084    ///   .source_iter(q!(vec![789]))
1085    ///   .batch(&tick, nondet!(/** test */))
1086    ///   .defer_tick();
1087    /// batch_first_tick.chain(batch_second_tick).first()
1088    ///   .filter_if(signal)
1089    ///   .unwrap_or(tick.singleton(q!(0)))
1090    ///   .all_ticks()
1091    /// # }, |mut stream| async move {
1092    /// // [456, 0]
1093    /// # for w in vec![456, 0] {
1094    /// #     assert_eq!(stream.next().await.unwrap(), w);
1095    /// # }
1096    /// # }));
1097    /// # }
1098    /// ```
1099    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1100    where
1101        B: IsBounded,
1102    {
1103        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1104    }
1105
1106    /// Filters this optional, passing through the optional value if it is non-null **and** the
1107    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1108    ///
1109    /// Useful for conditionally processing, such as only emitting an optional's value outside
1110    /// a tick if some other condition is satisfied.
1111    ///
1112    /// # Example
1113    /// ```rust
1114    /// # #[cfg(feature = "deploy")] {
1115    /// # use hydro_lang::prelude::*;
1116    /// # use futures::StreamExt;
1117    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118    /// let tick = process.tick();
1119    /// // ticks are lazy by default, forces the second tick to run
1120    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1121    ///
1122    /// let batch_first_tick = process
1123    ///   .source_iter(q!(vec![]))
1124    ///   .batch(&tick, nondet!(/** test */));
1125    /// let batch_second_tick = process
1126    ///   .source_iter(q!(vec![456]))
1127    ///   .batch(&tick, nondet!(/** test */))
1128    ///   .defer_tick(); // appears on the second tick
1129    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1130    /// batch_first_tick.chain(batch_second_tick).first()
1131    ///   .filter_if_some(some_on_first_tick)
1132    ///   .unwrap_or(tick.singleton(q!(789)))
1133    ///   .all_ticks()
1134    /// # }, |mut stream| async move {
1135    /// // [789, 789]
1136    /// # for w in vec![789, 789] {
1137    /// #     assert_eq!(stream.next().await.unwrap(), w);
1138    /// # }
1139    /// # }));
1140    /// # }
1141    /// ```
1142    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1143    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1144    where
1145        B: IsBounded,
1146    {
1147        self.filter_if(signal.is_some())
1148    }
1149
1150    /// Filters this optional, passing through the optional value if it is non-null **and** the
1151    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1152    ///
1153    /// Useful for conditionally processing, such as only emitting an optional's value outside
1154    /// a tick if some other condition is satisfied.
1155    ///
1156    /// # Example
1157    /// ```rust
1158    /// # #[cfg(feature = "deploy")] {
1159    /// # use hydro_lang::prelude::*;
1160    /// # use futures::StreamExt;
1161    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1162    /// let tick = process.tick();
1163    /// // ticks are lazy by default, forces the second tick to run
1164    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1165    ///
1166    /// let batch_first_tick = process
1167    ///   .source_iter(q!(vec![]))
1168    ///   .batch(&tick, nondet!(/** test */));
1169    /// let batch_second_tick = process
1170    ///   .source_iter(q!(vec![456]))
1171    ///   .batch(&tick, nondet!(/** test */))
1172    ///   .defer_tick(); // appears on the second tick
1173    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1174    /// batch_first_tick.chain(batch_second_tick).first()
1175    ///   .filter_if_none(some_on_first_tick)
1176    ///   .unwrap_or(tick.singleton(q!(789)))
1177    ///   .all_ticks()
1178    /// # }, |mut stream| async move {
1179    /// // [789, 789]
1180    /// # for w in vec![789, 456] {
1181    /// #     assert_eq!(stream.next().await.unwrap(), w);
1182    /// # }
1183    /// # }));
1184    /// # }
1185    /// ```
1186    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1187    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1188    where
1189        B: IsBounded,
1190    {
1191        self.filter_if(other.is_none())
1192    }
1193
1194    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1195    ///
1196    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1197    /// having a value, such as only releasing a piece of state if the node is the leader.
1198    ///
1199    /// # Example
1200    /// ```rust
1201    /// # #[cfg(feature = "deploy")] {
1202    /// # use hydro_lang::prelude::*;
1203    /// # use futures::StreamExt;
1204    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1205    /// let tick = process.tick();
1206    /// // ticks are lazy by default, forces the second tick to run
1207    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1208    ///
1209    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1210    /// some_on_first_tick
1211    ///     .if_some_then(tick.singleton(q!(456)))
1212    ///     .unwrap_or(tick.singleton(q!(123)))
1213    /// # .all_ticks()
1214    /// # }, |mut stream| async move {
1215    /// // 456 (first tick) ~> 123 (second tick onwards)
1216    /// # for w in vec![456, 123, 123] {
1217    /// #     assert_eq!(stream.next().await.unwrap(), w);
1218    /// # }
1219    /// # }));
1220    /// # }
1221    /// ```
1222    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1223    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1224    where
1225        B: IsBounded,
1226    {
1227        value.filter_if(self.is_some())
1228    }
1229}
1230
1231impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1232where
1233    L: Location<'a>,
1234{
1235    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1236    /// key-value pair of this [`Optional`].
1237    ///
1238    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1239    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1240    /// the entry will be updated and appear / disappear according to the state of the
1241    /// [`Optional`].
1242    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1243        KeyedSingleton::new(
1244            self.location.clone(),
1245            HydroNode::Cast {
1246                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247                metadata: self
1248                    .location
1249                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1250            },
1251        )
1252    }
1253}
1254
1255impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1256where
1257    L: Location<'a>,
1258{
1259    /// Returns an optional value corresponding to the latest snapshot of the optional
1260    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1261    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1262    /// all snapshots of this optional into the atomic-associated tick will observe the
1263    /// same value each tick.
1264    ///
1265    /// # Non-Determinism
1266    /// Because this picks a snapshot of a optional whose value is continuously changing,
1267    /// the output optional has a non-deterministic value since the snapshot can be at an
1268    /// arbitrary point in time.
1269    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1270        self,
1271        tick: &Tick<L2>,
1272        _nondet: NonDet,
1273    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1274        Optional::new(
1275            tick.drop_consistency(),
1276            HydroNode::Batch {
1277                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1278                metadata: tick
1279                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1280            },
1281        )
1282    }
1283
1284    /// Returns this optional back into a top-level, asynchronous execution context where updates
1285    /// to the value will be asynchronously propagated.
1286    pub fn end_atomic(self) -> Optional<T, L, B> {
1287        Optional::new(
1288            self.location.tick.l.clone(),
1289            HydroNode::EndAtomic {
1290                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1291                metadata: self
1292                    .location
1293                    .tick
1294                    .l
1295                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1296            },
1297        )
1298    }
1299}
1300
1301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1302where
1303    L: Location<'a>,
1304{
1305    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1306    /// will observe the same version of the value and will be executed synchronously before any
1307    /// outputs are yielded (in [`Optional::end_atomic`]).
1308    ///
1309    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1310    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1311    /// a different version).
1312    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1313        let id = self.location.flow_state().borrow_mut().next_clock_id();
1314        let out_location = Atomic {
1315            tick: Tick {
1316                id,
1317                l: self.location.clone(),
1318            },
1319        };
1320        Optional::new(
1321            out_location.clone(),
1322            HydroNode::BeginAtomic {
1323                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1324                metadata: out_location
1325                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1326            },
1327        )
1328    }
1329
1330    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1331    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1332    /// relevant data that contributed to the snapshot at tick `t`.
1333    ///
1334    /// # Non-Determinism
1335    /// Because this picks a snapshot of a optional whose value is continuously changing,
1336    /// the output optional has a non-deterministic value since the snapshot can be at an
1337    /// arbitrary point in time.
1338    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1339        self,
1340        tick: &Tick<L2>,
1341        _nondet: NonDet,
1342    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1343        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1344        Optional::new(
1345            tick.drop_consistency(),
1346            HydroNode::Batch {
1347                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1348                metadata: tick
1349                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1350            },
1351        )
1352    }
1353
1354    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1355    /// with order corresponding to increasing prefixes of data contributing to the optional.
1356    ///
1357    /// # Non-Determinism
1358    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1359    /// to non-deterministic batching and arrival of inputs, the output stream is
1360    /// non-deterministic.
1361    pub fn sample_eager(
1362        self,
1363        nondet: NonDet,
1364    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1365        let tick = self.location.tick();
1366        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1367    }
1368
1369    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1370    /// value taken at various points in time. Because the input optional may be
1371    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1372    /// represent the value of the optional given some prefix of the streams leading up to
1373    /// it.
1374    ///
1375    /// # Non-Determinism
1376    /// The output stream is non-deterministic in which elements are sampled, since this
1377    /// is controlled by a clock.
1378    pub fn sample_every(
1379        self,
1380        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1381        nondet: NonDet,
1382    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1383    where
1384        L: TopLevel<'a>,
1385    {
1386        let samples = self.location.source_interval(interval);
1387        let tick = self.location.tick();
1388
1389        self.snapshot(&tick, nondet)
1390            .filter_if(samples.batch(&tick, nondet).first().is_some())
1391            .all_ticks()
1392            .weaken_retries()
1393    }
1394}
1395
1396impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1397where
1398    L: Location<'a>,
1399{
1400    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1401    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1402    /// null values).
1403    ///
1404    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1405    /// producing one element in the output for each (non-null) tick. This is useful for batched
1406    /// computations, where the results from each tick must be combined together.
1407    ///
1408    /// # Example
1409    /// ```rust
1410    /// # #[cfg(feature = "deploy")] {
1411    /// # use hydro_lang::prelude::*;
1412    /// # use futures::StreamExt;
1413    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1414    /// # let tick = process.tick();
1415    /// # // ticks are lazy by default, forces the second tick to run
1416    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1417    /// # let batch_first_tick = process
1418    /// #   .source_iter(q!(vec![]))
1419    /// #   .batch(&tick, nondet!(/** test */));
1420    /// # let batch_second_tick = process
1421    /// #   .source_iter(q!(vec![1, 2, 3]))
1422    /// #   .batch(&tick, nondet!(/** test */))
1423    /// #   .defer_tick(); // appears on the second tick
1424    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1425    /// input_batch // first tick: [], second tick: [1, 2, 3]
1426    ///     .max()
1427    ///     .all_ticks()
1428    /// # }, |mut stream| async move {
1429    /// // [3]
1430    /// # for w in vec![3] {
1431    /// #     assert_eq!(stream.next().await.unwrap(), w);
1432    /// # }
1433    /// # }));
1434    /// # }
1435    /// ```
1436    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1437        self.into_stream().all_ticks()
1438    }
1439
1440    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1441    /// which will stream the value computed in _each_ tick as a separate stream element.
1442    ///
1443    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1444    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1445    /// optional's [`Tick`] context.
1446    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1447        self.into_stream().all_ticks_atomic()
1448    }
1449
1450    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1451    /// be asynchronously updated with the latest value of the optional inside the tick, including
1452    /// whether the optional is null or not.
1453    ///
1454    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1455    /// tick that tracks the inner value. This is useful for getting the value as of the
1456    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1457    ///
1458    /// # Example
1459    /// ```rust
1460    /// # #[cfg(feature = "deploy")] {
1461    /// # use hydro_lang::prelude::*;
1462    /// # use futures::StreamExt;
1463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1464    /// # let tick = process.tick();
1465    /// # // ticks are lazy by default, forces the second tick to run
1466    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1467    /// # let batch_first_tick = process
1468    /// #   .source_iter(q!(vec![]))
1469    /// #   .batch(&tick, nondet!(/** test */));
1470    /// # let batch_second_tick = process
1471    /// #   .source_iter(q!(vec![1, 2, 3]))
1472    /// #   .batch(&tick, nondet!(/** test */))
1473    /// #   .defer_tick(); // appears on the second tick
1474    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1475    /// input_batch // first tick: [], second tick: [1, 2, 3]
1476    ///     .max()
1477    ///     .latest()
1478    /// # .into_singleton()
1479    /// # .sample_eager(nondet!(/** test */))
1480    /// # }, |mut stream| async move {
1481    /// // asynchronously changes from None ~> 3
1482    /// # for w in vec![None, Some(3)] {
1483    /// #     assert_eq!(stream.next().await.unwrap(), w);
1484    /// # }
1485    /// # }));
1486    /// # }
1487    /// ```
1488    pub fn latest(self) -> Optional<T, L, Unbounded> {
1489        Optional::new(
1490            self.location.outer().clone(),
1491            HydroNode::YieldConcat {
1492                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1493                metadata: self
1494                    .location
1495                    .outer()
1496                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1497            },
1498        )
1499    }
1500
1501    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1502    /// be updated with the latest value of the optional inside the tick.
1503    ///
1504    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1505    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1506    /// optional's [`Tick`] context.
1507    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1508        let out_location = Atomic {
1509            tick: self.location.clone(),
1510        };
1511
1512        Optional::new(
1513            out_location.clone(),
1514            HydroNode::YieldConcat {
1515                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1516                metadata: out_location
1517                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1518            },
1519        )
1520    }
1521
1522    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1523    /// always has the state of `self` at tick `T - 1`.
1524    ///
1525    /// At tick `0`, the output optional is null, since there is no previous tick.
1526    ///
1527    /// This operator enables stateful iterative processing with ticks, by sending data from one
1528    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1529    ///
1530    /// # Example
1531    /// ```rust
1532    /// # #[cfg(feature = "deploy")] {
1533    /// # use hydro_lang::prelude::*;
1534    /// # use futures::StreamExt;
1535    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1536    /// let tick = process.tick();
1537    /// // ticks are lazy by default, forces the second tick to run
1538    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1539    ///
1540    /// let batch_first_tick = process
1541    ///   .source_iter(q!(vec![1, 2]))
1542    ///   .batch(&tick, nondet!(/** test */));
1543    /// let batch_second_tick = process
1544    ///   .source_iter(q!(vec![3, 4]))
1545    ///   .batch(&tick, nondet!(/** test */))
1546    ///   .defer_tick(); // appears on the second tick
1547    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1548    ///   .reduce(q!(|state, v| *state += v));
1549    ///
1550    /// current_tick_sum.clone().into_singleton().zip(
1551    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1552    /// ).all_ticks()
1553    /// # }, |mut stream| async move {
1554    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1555    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1556    /// #     assert_eq!(stream.next().await.unwrap(), w);
1557    /// # }
1558    /// # }));
1559    /// # }
1560    /// ```
1561    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1562        Optional::new(
1563            self.location.clone(),
1564            HydroNode::DeferTick {
1565                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1566                metadata: self.location.new_node_metadata(Self::collection_kind()),
1567            },
1568        )
1569    }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574    #[cfg(feature = "deploy")]
1575    use futures::StreamExt;
1576    #[cfg(feature = "deploy")]
1577    use hydro_deploy::Deployment;
1578    #[cfg(any(feature = "deploy", feature = "sim"))]
1579    use stageleft::q;
1580
1581    #[cfg(feature = "deploy")]
1582    use super::Optional;
1583    #[cfg(any(feature = "deploy", feature = "sim"))]
1584    use crate::compile::builder::FlowBuilder;
1585    #[cfg(any(feature = "deploy", feature = "sim"))]
1586    use crate::location::Location;
1587    #[cfg(feature = "deploy")]
1588    use crate::nondet::nondet;
1589
1590    #[cfg(feature = "deploy")]
1591    #[tokio::test]
1592    async fn optional_or_cardinality() {
1593        let mut deployment = Deployment::new();
1594
1595        let mut flow = FlowBuilder::new();
1596        let node = flow.process::<()>();
1597        let external = flow.external::<()>();
1598
1599        let node_tick = node.tick();
1600        let tick_singleton = node_tick.singleton(q!(123));
1601        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1602        let counts = tick_optional_inhabited
1603            .clone()
1604            .or(tick_optional_inhabited)
1605            .into_stream()
1606            .count()
1607            .all_ticks()
1608            .send_bincode_external(&external);
1609
1610        let nodes = flow
1611            .with_process(&node, deployment.Localhost())
1612            .with_external(&external, deployment.Localhost())
1613            .deploy(&mut deployment);
1614
1615        deployment.deploy().await.unwrap();
1616
1617        let mut external_out = nodes.connect(counts).await;
1618
1619        deployment.start().await.unwrap();
1620
1621        assert_eq!(external_out.next().await.unwrap(), 1);
1622    }
1623
1624    #[cfg(feature = "deploy")]
1625    #[tokio::test]
1626    async fn into_singleton_top_level_none_cardinality() {
1627        let mut deployment = Deployment::new();
1628
1629        let mut flow = FlowBuilder::new();
1630        let node = flow.process::<()>();
1631        let external = flow.external::<()>();
1632
1633        let node_tick = node.tick();
1634        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1635        let into_singleton = top_level_none.into_singleton();
1636
1637        let tick_driver = node.spin();
1638
1639        let counts = into_singleton
1640            .snapshot(&node_tick, nondet!(/** test */))
1641            .into_stream()
1642            .count()
1643            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1644            .map(q!(|(c, _)| c))
1645            .all_ticks()
1646            .send_bincode_external(&external);
1647
1648        let nodes = flow
1649            .with_process(&node, deployment.Localhost())
1650            .with_external(&external, deployment.Localhost())
1651            .deploy(&mut deployment);
1652
1653        deployment.deploy().await.unwrap();
1654
1655        let mut external_out = nodes.connect(counts).await;
1656
1657        deployment.start().await.unwrap();
1658
1659        assert_eq!(external_out.next().await.unwrap(), 1);
1660        assert_eq!(external_out.next().await.unwrap(), 1);
1661        assert_eq!(external_out.next().await.unwrap(), 1);
1662    }
1663
1664    #[cfg(feature = "deploy")]
1665    #[tokio::test]
1666    async fn into_singleton_unbounded_top_level_none_cardinality() {
1667        let mut deployment = Deployment::new();
1668
1669        let mut flow = FlowBuilder::new();
1670        let node = flow.process::<()>();
1671        let external = flow.external::<()>();
1672
1673        let node_tick = node.tick();
1674        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1675        let into_singleton = top_level_none.into_singleton();
1676
1677        let tick_driver = node.spin();
1678
1679        let counts = into_singleton
1680            .snapshot(&node_tick, nondet!(/** test */))
1681            .into_stream()
1682            .count()
1683            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1684            .map(q!(|(c, _)| c))
1685            .all_ticks()
1686            .send_bincode_external(&external);
1687
1688        let nodes = flow
1689            .with_process(&node, deployment.Localhost())
1690            .with_external(&external, deployment.Localhost())
1691            .deploy(&mut deployment);
1692
1693        deployment.deploy().await.unwrap();
1694
1695        let mut external_out = nodes.connect(counts).await;
1696
1697        deployment.start().await.unwrap();
1698
1699        assert_eq!(external_out.next().await.unwrap(), 1);
1700        assert_eq!(external_out.next().await.unwrap(), 1);
1701        assert_eq!(external_out.next().await.unwrap(), 1);
1702    }
1703
1704    #[cfg(feature = "sim")]
1705    #[test]
1706    fn top_level_optional_some_into_stream_no_replay() {
1707        let mut flow = FlowBuilder::new();
1708        let node = flow.process::<()>();
1709
1710        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1711        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1712        let filtered_some = folded.filter(q!(|_| true));
1713
1714        let out_recv = filtered_some.into_stream().sim_output();
1715
1716        flow.sim().exhaustive(async || {
1717            out_recv.assert_yields_only([10]).await;
1718        });
1719    }
1720
1721    #[cfg(feature = "sim")]
1722    #[test]
1723    fn top_level_optional_none_into_stream_no_replay() {
1724        let mut flow = FlowBuilder::new();
1725        let node = flow.process::<()>();
1726
1727        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1728        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1729        let filtered_none = folded.filter(q!(|_| false));
1730
1731        let out_recv = filtered_none.into_stream().sim_output();
1732
1733        flow.sim().exhaustive(async || {
1734            out_recv.assert_yields_only([] as [i32; 0]).await;
1735        });
1736    }
1737
1738    #[cfg(feature = "deploy")]
1739    #[tokio::test]
1740    async fn test_optional_ref() {
1741        let mut deployment = Deployment::new();
1742
1743        let mut flow = FlowBuilder::new();
1744        let external = flow.external::<()>();
1745        let p1 = flow.process::<()>();
1746
1747        // Create an optional: reduce 0..5 => Some(10) (sum via reduce)
1748        let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1749
1750        let opt_ref = my_opt.by_ref();
1751
1752        // Use the optional ref in a map: unwrap_or(0) + x
1753        let out_port = p1
1754            .source_iter(q!(1..=3i32))
1755            .map(q!(|x| x + opt_ref.unwrap_or(0)))
1756            .send_bincode_external(&external);
1757
1758        let nodes = flow
1759            .with_default_optimize()
1760            .with_process(&p1, deployment.Localhost())
1761            .with_external(&external, deployment.Localhost())
1762            .deploy(&mut deployment);
1763
1764        deployment.deploy().await.unwrap();
1765
1766        let mut out_recv = nodes.connect(out_port).await;
1767
1768        deployment.start().await.unwrap();
1769
1770        let mut results = Vec::new();
1771        for _ in 0..3 {
1772            results.push(out_recv.next().await.unwrap());
1773        }
1774        results.sort();
1775        // reduce(0..5) = 10, so results should be 11, 12, 13
1776        assert_eq!(results, vec![11, 12, 13]);
1777    }
1778
1779    #[cfg(feature = "deploy")]
1780    #[tokio::test]
1781    async fn test_optional_ref_none() {
1782        let mut deployment = Deployment::new();
1783
1784        let mut flow = FlowBuilder::new();
1785        let external = flow.external::<()>();
1786        let p1 = flow.process::<()>();
1787
1788        // Create an optional from an empty source => None
1789        let my_opt = p1
1790            .source_iter(q!(std::iter::empty::<i32>()))
1791            .reduce(q!(|a, b| *a += b));
1792
1793        let opt_ref = my_opt.by_ref();
1794
1795        // Use the optional ref: should be None, so unwrap_or(99)
1796        let out_port = p1
1797            .source_iter(q!(1..=2i32))
1798            .map(q!(|x| x + opt_ref.unwrap_or(99)))
1799            .send_bincode_external(&external);
1800
1801        let nodes = flow
1802            .with_default_optimize()
1803            .with_process(&p1, deployment.Localhost())
1804            .with_external(&external, deployment.Localhost())
1805            .deploy(&mut deployment);
1806
1807        deployment.deploy().await.unwrap();
1808
1809        let mut out_recv = nodes.connect(out_port).await;
1810
1811        deployment.start().await.unwrap();
1812
1813        let mut results = Vec::new();
1814        for _ in 0..2 {
1815            results.push(out_recv.next().await.unwrap());
1816        }
1817        results.sort();
1818        // optional is None, so unwrap_or(99) => 100, 101
1819        assert_eq!(results, vec![100, 101]);
1820    }
1821
1822    #[cfg(feature = "deploy")]
1823    #[tokio::test]
1824    async fn test_optional_ref_and_consume() {
1825        let mut deployment = Deployment::new();
1826
1827        let mut flow = FlowBuilder::new();
1828        let external = flow.external::<()>();
1829        let p1 = flow.process::<()>();
1830
1831        // Use reduce to produce an Optional
1832        let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1833
1834        let opt_ref = my_opt.by_ref();
1835
1836        // Reference path
1837        let out_port_ref = p1
1838            .source_iter(q!(1..=2i32))
1839            .map(q!(|x| x + opt_ref.unwrap_or(0)))
1840            .send_bincode_external(&external);
1841
1842        let nodes = flow
1843            .with_default_optimize()
1844            .with_process(&p1, deployment.Localhost())
1845            .with_external(&external, deployment.Localhost())
1846            .deploy(&mut deployment);
1847
1848        deployment.deploy().await.unwrap();
1849
1850        let mut out_recv_ref = nodes.connect(out_port_ref).await;
1851
1852        deployment.start().await.unwrap();
1853
1854        let mut ref_results = Vec::new();
1855        for _ in 0..2 {
1856            ref_results.push(out_recv_ref.next().await.unwrap());
1857        }
1858        ref_results.sort();
1859        // reduce(0..5) = 10, so 1+10=11, 2+10=12
1860        assert_eq!(ref_results, vec![11, 12]);
1861    }
1862}