hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick};
23use crate::location::{Location, Tick, TopLevel, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26use crate::properties::{StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor};
27
28/// A *nullable* Rust value that can asynchronously change over time.
29///
30/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
31/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
32/// asynchronously change over time, including becoming present of uninhabited.
33///
34/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
35/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
36///
37/// Type Parameters:
38/// - `Type`: the type of the value in this optional (when it is not null)
39/// - `Loc`: the [`Location`] where the optional is materialized
40/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
41pub struct Optional<Type, Loc, Bound: Boundedness> {
42 pub(crate) location: Loc,
43 pub(crate) ir_node: RefCell<HydroNode>,
44 pub(crate) flow_state: FlowState,
45
46 _phantom: PhantomData<(Type, Loc, Bound)>,
47}
48
49impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
50 fn drop(&mut self) {
51 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
52 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
53 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
54 input: Box::new(ir_node),
55 op_metadata: HydroIrOpMetadata::new(),
56 });
57 }
58 }
59}
60
61impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
62where
63 T: Clone,
64 L: Location<'a>,
65{
66 fn from(value: Optional<T, L, Bounded>) -> Self {
67 let tick = value.location().tick();
68 value.clone_into_tick(&tick).latest()
69 }
70}
71
72impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
73where
74 L: Location<'a>,
75{
76 fn defer_tick(self) -> Self {
77 Optional::defer_tick(self)
78 }
79}
80
81impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
82where
83 L: Location<'a>,
84{
85 type Location = Tick<L>;
86
87 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
88 Optional::new(
89 location.clone(),
90 HydroNode::CycleSource {
91 cycle_id,
92 metadata: location.new_node_metadata(Self::collection_kind()),
93 },
94 )
95 }
96}
97
98impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
99where
100 L: Location<'a>,
101{
102 type Location = Tick<L>;
103
104 fn location(&self) -> &Self::Location {
105 self.location()
106 }
107
108 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
109 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
110 location.clone(),
111 HydroNode::DeferTick {
112 input: Box::new(HydroNode::CycleSource {
113 cycle_id,
114 metadata: location.new_node_metadata(Self::collection_kind()),
115 }),
116 metadata: location
117 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
118 },
119 );
120
121 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
122 }
123}
124
125impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
126where
127 L: Location<'a>,
128{
129 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
130 assert_eq!(
131 Location::id(&self.location),
132 expected_location,
133 "locations do not match"
134 );
135 self.location
136 .flow_state()
137 .borrow_mut()
138 .push_root(HydroRoot::CycleSink {
139 cycle_id,
140 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
141 op_metadata: HydroIrOpMetadata::new(),
142 });
143 }
144}
145
146impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
147where
148 L: Location<'a>,
149{
150 type Location = L;
151
152 fn create_source(cycle_id: CycleId, location: L) -> Self {
153 Optional::new(
154 location.clone(),
155 HydroNode::CycleSource {
156 cycle_id,
157 metadata: location.new_node_metadata(Self::collection_kind()),
158 },
159 )
160 }
161}
162
163impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
164where
165 L: Location<'a>,
166{
167 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
168 assert_eq!(
169 Location::id(&self.location),
170 expected_location,
171 "locations do not match"
172 );
173 self.location
174 .flow_state()
175 .borrow_mut()
176 .push_root(HydroRoot::CycleSink {
177 cycle_id,
178 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
179 op_metadata: HydroIrOpMetadata::new(),
180 });
181 }
182}
183
184impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
185where
186 L: Location<'a>,
187{
188 fn from(singleton: Singleton<T, L, B>) -> Self {
189 Optional::new(
190 singleton.location.clone(),
191 HydroNode::Cast {
192 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
193 metadata: singleton
194 .location
195 .new_node_metadata(Self::collection_kind()),
196 },
197 )
198 }
199}
200
201#[cfg(stageleft_runtime)]
202pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
203 me: Optional<T, L, B>,
204 other: Optional<O, L, B>,
205) -> Optional<(T, O), L, B> {
206 check_matching_location(&me.location, &other.location);
207
208 Optional::new(
209 me.location.clone(),
210 HydroNode::CrossSingleton {
211 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
212 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
213 metadata: me
214 .location
215 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
216 },
217 )
218}
219
220#[cfg(stageleft_runtime)]
221fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
222 me: Optional<T, L, B>,
223 other: Optional<T, L, B>,
224) -> Optional<T, L, B> {
225 check_matching_location(&me.location, &other.location);
226
227 Optional::new(
228 me.location.clone(),
229 HydroNode::ChainFirst {
230 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
231 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
232 metadata: me
233 .location
234 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
235 },
236 )
237}
238
239impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
240where
241 T: Clone,
242 L: Location<'a>,
243{
244 fn clone(&self) -> Self {
245 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
246 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
247 *self.ir_node.borrow_mut() = HydroNode::Tee {
248 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
249 metadata: self.location.new_node_metadata(Self::collection_kind()),
250 };
251 }
252
253 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
254 Optional {
255 location: self.location.clone(),
256 flow_state: self.flow_state.clone(),
257 ir_node: HydroNode::Tee {
258 inner: SharedNode(inner.0.clone()),
259 metadata: metadata.clone(),
260 }
261 .into(),
262 _phantom: PhantomData,
263 }
264 } else {
265 unreachable!()
266 }
267 }
268}
269
270impl<'a, T, L, B: Boundedness> Optional<T, L, B>
271where
272 L: Location<'a>,
273{
274 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
275 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
276 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
277 let flow_state = location.flow_state().clone();
278 Optional {
279 location,
280 flow_state,
281 ir_node: RefCell::new(ir_node),
282 _phantom: PhantomData,
283 }
284 }
285
286 pub(crate) fn collection_kind() -> CollectionKind {
287 CollectionKind::Optional {
288 bound: B::BOUND_KIND,
289 element_type: stageleft::quote_type::<T>().into(),
290 }
291 }
292
293 /// Returns the [`Location`] where this optional is being materialized.
294 pub fn location(&self) -> &L {
295 &self.location
296 }
297
298 /// Creates a shared reference handle to this optional that can be captured inside `q!()`
299 /// closures. The handle resolves to `&Option<T>` at runtime.
300 ///
301 /// The optional must be bounded, otherwise reading it would be non-deterministic.
302 pub fn by_ref(&self) -> crate::handoff_ref::OptionalRef<'a, '_, T, L>
303 where
304 B: IsBounded,
305 {
306 crate::handoff_ref::OptionalRef::new(&self.ir_node)
307 }
308
309 /// Returns a mutable reference handle to this optional that can be captured inside `q!()`
310 /// closures. The handle resolves to `&mut Option<T>` at runtime.
311 pub fn by_mut(&self) -> crate::handoff_ref::OptionalMut<'a, '_, T, L>
312 where
313 B: IsBounded,
314 {
315 crate::handoff_ref::OptionalMut::new(&self.ir_node)
316 }
317
318 /// Weakens the consistency of this live collection to not guarantee any consistency across
319 /// cluster members (if this collection is on a cluster).
320 pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
321 where
322 L: Location<'a>,
323 {
324 if L::consistency()
325 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
326 {
327 // already no consistency
328 Optional::new(
329 self.location.drop_consistency(),
330 self.ir_node.replace(HydroNode::Placeholder),
331 )
332 } else {
333 Optional::new(
334 self.location.drop_consistency(),
335 HydroNode::Cast {
336 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337 metadata: self
338 .location
339 .clone()
340 .drop_consistency()
341 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
342 },
343 )
344 }
345 }
346
347 /// Casts this live collection to have the consistency guarantees specified in the given
348 /// location type parameter. The developer must ensure that the strengthened consistency
349 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
350 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
351 self,
352 _proof: impl crate::properties::ConsistencyProof,
353 ) -> Optional<T, L2, B>
354 where
355 L: Location<'a>,
356 {
357 if L::consistency() == L2::consistency() {
358 Optional::new(
359 self.location.with_consistency_of(),
360 self.ir_node.replace(HydroNode::Placeholder),
361 )
362 } else {
363 Optional::new(
364 self.location.with_consistency_of(),
365 HydroNode::AssertIsConsistent {
366 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367 trusted: false,
368 metadata: self
369 .location
370 .clone()
371 .with_consistency_of::<L2>()
372 .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
373 },
374 )
375 }
376 }
377
378 /// Transforms the optional value by applying a function `f` to it,
379 /// continuously as the input is updated.
380 ///
381 /// Whenever the optional is empty, the output optional is also empty.
382 ///
383 /// # Example
384 /// ```rust
385 /// # #[cfg(feature = "deploy")] {
386 /// # use hydro_lang::prelude::*;
387 /// # use futures::StreamExt;
388 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
389 /// let tick = process.tick();
390 /// let optional = tick.optional_first_tick(q!(1));
391 /// optional.map(q!(|v| v + 1)).all_ticks()
392 /// # }, |mut stream| async move {
393 /// // 2
394 /// # assert_eq!(stream.next().await.unwrap(), 2);
395 /// # }));
396 /// # }
397 /// ```
398 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
399 where
400 F: Fn(T) -> U + 'a,
401 {
402 let f = f.splice_fn1_ctx(&self.location).into();
403 Optional::new(
404 self.location.clone(),
405 HydroNode::Map {
406 f,
407 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408 metadata: self
409 .location
410 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
411 },
412 )
413 }
414
415 /// Transforms the optional value by applying a function `f` to it and then flattening
416 /// the result into a stream, preserving the order of elements.
417 ///
418 /// If the optional is empty, the output stream is also empty. If the optional contains
419 /// a value, `f` is applied to produce an iterator, and all items from that iterator
420 /// are emitted in the output stream in deterministic order.
421 ///
422 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
423 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
424 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
425 ///
426 /// # Example
427 /// ```rust
428 /// # #[cfg(feature = "deploy")] {
429 /// # use hydro_lang::prelude::*;
430 /// # use futures::StreamExt;
431 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
432 /// let tick = process.tick();
433 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
434 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
435 /// # }, |mut stream| async move {
436 /// // 1, 2, 3
437 /// # for w in vec![1, 2, 3] {
438 /// # assert_eq!(stream.next().await.unwrap(), w);
439 /// # }
440 /// # }));
441 /// # }
442 /// ```
443 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
444 self,
445 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
446 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
447 where
448 B: IsBounded,
449 I: IntoIterator<Item = U>,
450 F: FnMut(T) -> I + 'a,
451 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
452 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
453 {
454 self.into_stream().flat_map_ordered(f)
455 }
456
457 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
458 /// for the output type `I` to produce items in any order.
459 ///
460 /// If the optional is empty, the output stream is also empty. If the optional contains
461 /// a value, `f` is applied to produce an iterator, and all items from that iterator
462 /// are emitted in the output stream in non-deterministic order.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
470 /// let tick = process.tick();
471 /// let optional = tick.optional_first_tick(q!(
472 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473 /// ));
474 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
475 /// # }, |mut stream| async move {
476 /// // 1, 2, 3, but in no particular order
477 /// # let mut results = Vec::new();
478 /// # for _ in 0..3 {
479 /// # results.push(stream.next().await.unwrap());
480 /// # }
481 /// # results.sort();
482 /// # assert_eq!(results, vec![1, 2, 3]);
483 /// # }));
484 /// # }
485 /// ```
486 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
487 self,
488 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
489 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
490 where
491 B: IsBounded,
492 I: IntoIterator<Item = U>,
493 F: FnMut(T) -> I + 'a,
494 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
495 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
496 {
497 self.into_stream().flat_map_unordered(f)
498 }
499
500 /// Flattens the optional value into a stream, preserving the order of elements.
501 ///
502 /// If the optional is empty, the output stream is also empty. If the optional contains
503 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
504 /// in the output stream in deterministic order.
505 ///
506 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
507 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
508 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
509 ///
510 /// # Example
511 /// ```rust
512 /// # #[cfg(feature = "deploy")] {
513 /// # use hydro_lang::prelude::*;
514 /// # use futures::StreamExt;
515 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
516 /// let tick = process.tick();
517 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
518 /// optional.flatten_ordered().all_ticks()
519 /// # }, |mut stream| async move {
520 /// // 1, 2, 3
521 /// # for w in vec![1, 2, 3] {
522 /// # assert_eq!(stream.next().await.unwrap(), w);
523 /// # }
524 /// # }));
525 /// # }
526 /// ```
527 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
528 where
529 B: IsBounded,
530 T: IntoIterator<Item = U>,
531 {
532 self.flat_map_ordered(q!(|v| v))
533 }
534
535 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
536 /// for the element type `T` to produce items in any order.
537 ///
538 /// If the optional is empty, the output stream is also empty. If the optional contains
539 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
540 /// in the output stream in non-deterministic order.
541 ///
542 /// # Example
543 /// ```rust
544 /// # #[cfg(feature = "deploy")] {
545 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
546 /// # use futures::StreamExt;
547 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
548 /// let tick = process.tick();
549 /// let optional = tick.optional_first_tick(q!(
550 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
551 /// ));
552 /// optional.flatten_unordered().all_ticks()
553 /// # }, |mut stream| async move {
554 /// // 1, 2, 3, but in no particular order
555 /// # let mut results = Vec::new();
556 /// # for _ in 0..3 {
557 /// # results.push(stream.next().await.unwrap());
558 /// # }
559 /// # results.sort();
560 /// # assert_eq!(results, vec![1, 2, 3]);
561 /// # }));
562 /// # }
563 /// ```
564 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
565 where
566 B: IsBounded,
567 T: IntoIterator<Item = U>,
568 {
569 self.flat_map_unordered(q!(|v| v))
570 }
571
572 /// Creates an optional containing only the value if it satisfies a predicate `f`.
573 ///
574 /// If the optional is empty, the output optional is also empty. If the optional contains
575 /// a value and the predicate returns `true`, the output optional contains the same value.
576 /// If the predicate returns `false`, the output optional is empty.
577 ///
578 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
579 /// not modify or take ownership of the value. If you need to modify the value while filtering
580 /// use [`Optional::filter_map`] instead.
581 ///
582 /// # Example
583 /// ```rust
584 /// # #[cfg(feature = "deploy")] {
585 /// # use hydro_lang::prelude::*;
586 /// # use futures::StreamExt;
587 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
588 /// let tick = process.tick();
589 /// let optional = tick.optional_first_tick(q!(5));
590 /// optional.filter(q!(|&x| x > 3)).all_ticks()
591 /// # }, |mut stream| async move {
592 /// // 5
593 /// # assert_eq!(stream.next().await.unwrap(), 5);
594 /// # }));
595 /// # }
596 /// ```
597 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
598 where
599 F: Fn(&T) -> bool + 'a,
600 {
601 let f = f.splice_fn1_borrow_ctx(&self.location).into();
602 Optional::new(
603 self.location.clone(),
604 HydroNode::Filter {
605 f,
606 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
607 metadata: self.location.new_node_metadata(Self::collection_kind()),
608 },
609 )
610 }
611
612 /// An operator that both filters and maps. It yields only the value if the supplied
613 /// closure `f` returns `Some(value)`.
614 ///
615 /// If the optional is empty, the output optional is also empty. If the optional contains
616 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
617 /// If the closure returns `None`, the output optional is empty.
618 ///
619 /// # Example
620 /// ```rust
621 /// # #[cfg(feature = "deploy")] {
622 /// # use hydro_lang::prelude::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625 /// let tick = process.tick();
626 /// let optional = tick.optional_first_tick(q!("42"));
627 /// optional
628 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
629 /// .all_ticks()
630 /// # }, |mut stream| async move {
631 /// // 42
632 /// # assert_eq!(stream.next().await.unwrap(), 42);
633 /// # }));
634 /// # }
635 /// ```
636 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
637 where
638 F: Fn(T) -> Option<U> + 'a,
639 {
640 let f = f.splice_fn1_ctx(&self.location).into();
641 Optional::new(
642 self.location.clone(),
643 HydroNode::FilterMap {
644 f,
645 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
646 metadata: self
647 .location
648 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
649 },
650 )
651 }
652
653 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
654 ///
655 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
656 /// non-null. This is useful for combining several pieces of state together.
657 ///
658 /// # Example
659 /// ```rust
660 /// # #[cfg(feature = "deploy")] {
661 /// # use hydro_lang::prelude::*;
662 /// # use futures::StreamExt;
663 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
664 /// let tick = process.tick();
665 /// let numbers = process
666 /// .source_iter(q!(vec![123, 456, 789]))
667 /// .batch(&tick, nondet!(/** test */));
668 /// let min = numbers.clone().min(); // Optional
669 /// let max = numbers.max(); // Optional
670 /// min.zip(max).all_ticks()
671 /// # }, |mut stream| async move {
672 /// // [(123, 789)]
673 /// # for w in vec![(123, 789)] {
674 /// # assert_eq!(stream.next().await.unwrap(), w);
675 /// # }
676 /// # }));
677 /// # }
678 /// ```
679 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
680 where
681 B: IsBounded,
682 {
683 let other: Optional<O, L, B> = other.into();
684 check_matching_location(&self.location, &other.location);
685
686 if L::is_top_level()
687 && let Some(tick) = self.location.try_tick()
688 {
689 let self_location = self.location().clone();
690 let out = zip_inside_tick(
691 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693 )
694 .latest();
695
696 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
697 } else {
698 zip_inside_tick(self, other)
699 }
700 }
701
702 /// Passes through `self` when it has a value, otherwise passes through `other`.
703 ///
704 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
705 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
706 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
707 ///
708 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
709 /// of the inputs change (including to/from null states).
710 ///
711 /// # Example
712 /// ```rust
713 /// # #[cfg(feature = "deploy")] {
714 /// # use hydro_lang::prelude::*;
715 /// # use futures::StreamExt;
716 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717 /// let tick = process.tick();
718 /// // ticks are lazy by default, forces the second tick to run
719 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
720 ///
721 /// let some_first_tick = tick.optional_first_tick(q!(123));
722 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
723 /// some_first_tick.or(some_second_tick).all_ticks()
724 /// # }, |mut stream| async move {
725 /// // [123 /* first tick */, 456 /* second tick */]
726 /// # for w in vec![123, 456] {
727 /// # assert_eq!(stream.next().await.unwrap(), w);
728 /// # }
729 /// # }));
730 /// # }
731 /// ```
732 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
733 check_matching_location(&self.location, &other.location);
734
735 if L::is_top_level()
736 && !B::BOUNDED // only if unbounded we need to use a tick
737 && let Some(tick) = self.location.try_tick()
738 {
739 let self_location = self.location().clone();
740 let out = or_inside_tick(
741 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
742 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
743 )
744 .latest();
745
746 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
747 } else {
748 Optional::new(
749 self.location.clone(),
750 HydroNode::ChainFirst {
751 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
752 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
753 metadata: self.location.new_node_metadata(Self::collection_kind()),
754 },
755 )
756 }
757 }
758
759 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
760 ///
761 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
762 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
763 ///
764 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
765 /// of the inputs change (including to/from null states).
766 ///
767 /// # Example
768 /// ```rust
769 /// # #[cfg(feature = "deploy")] {
770 /// # use hydro_lang::prelude::*;
771 /// # use futures::StreamExt;
772 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
773 /// let tick = process.tick();
774 /// // ticks are lazy by default, forces the later ticks to run
775 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
776 ///
777 /// let some_first_tick = tick.optional_first_tick(q!(123));
778 /// some_first_tick
779 /// .unwrap_or(tick.singleton(q!(456)))
780 /// .all_ticks()
781 /// # }, |mut stream| async move {
782 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
783 /// # for w in vec![123, 456, 456, 456] {
784 /// # assert_eq!(stream.next().await.unwrap(), w);
785 /// # }
786 /// # }));
787 /// # }
788 /// ```
789 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
790 let res_option = self.or(other.into());
791 Singleton::new(
792 res_option.location.clone(),
793 HydroNode::Cast {
794 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
795 metadata: res_option
796 .location
797 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
798 },
799 )
800 }
801
802 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
803 ///
804 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
805 /// [`Optional`] when the default value of the type is a suitable fallback.
806 ///
807 /// # Example
808 /// ```rust
809 /// # #[cfg(feature = "deploy")] {
810 /// # use hydro_lang::prelude::*;
811 /// # use futures::StreamExt;
812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813 /// let tick = process.tick();
814 /// // ticks are lazy by default, forces the later ticks to run
815 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
816 ///
817 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
818 /// some_first_tick.unwrap_or_default().all_ticks()
819 /// # }, |mut stream| async move {
820 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
821 /// # for w in vec![123, 0, 0, 0] {
822 /// # assert_eq!(stream.next().await.unwrap(), w);
823 /// # }
824 /// # }));
825 /// # }
826 /// ```
827 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
828 where
829 T: Default + Clone,
830 {
831 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
832 }
833
834 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
835 ///
836 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
837 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
838 /// so that Hydro can skip any computation on null values.
839 ///
840 /// # Example
841 /// ```rust
842 /// # #[cfg(feature = "deploy")] {
843 /// # use hydro_lang::prelude::*;
844 /// # use futures::StreamExt;
845 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
846 /// let tick = process.tick();
847 /// // ticks are lazy by default, forces the later ticks to run
848 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
849 ///
850 /// let some_first_tick = tick.optional_first_tick(q!(123));
851 /// some_first_tick.into_singleton().all_ticks()
852 /// # }, |mut stream| async move {
853 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
854 /// # for w in vec![Some(123), None, None, None] {
855 /// # assert_eq!(stream.next().await.unwrap(), w);
856 /// # }
857 /// # }));
858 /// # }
859 /// ```
860 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
861 where
862 T: Clone,
863 {
864 let none: syn::Expr = parse_quote!(::std::option::Option::None);
865
866 let none_singleton = Singleton::new(
867 self.location.clone(),
868 HydroNode::SingletonSource {
869 value: none.into(),
870 first_tick_only: false,
871 metadata: self
872 .location
873 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
874 },
875 );
876
877 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
878 }
879
880 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
881 ///
882 /// # Example
883 /// ```rust
884 /// # #[cfg(feature = "deploy")] {
885 /// # use hydro_lang::prelude::*;
886 /// # use futures::StreamExt;
887 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888 /// let tick = process.tick();
889 /// // ticks are lazy by default, forces the second tick to run
890 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
891 ///
892 /// let some_first_tick = tick.optional_first_tick(q!(42));
893 /// some_first_tick.is_some().all_ticks()
894 /// # }, |mut stream| async move {
895 /// // [true /* first tick */, false /* second tick */, ...]
896 /// # for w in vec![true, false] {
897 /// # assert_eq!(stream.next().await.unwrap(), w);
898 /// # }
899 /// # }));
900 /// # }
901 /// ```
902 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
903 pub fn is_some(self) -> Singleton<bool, L, B> {
904 self.map(q!(|_| ()))
905 .into_singleton()
906 .map(q!(|o| o.is_some()))
907 }
908
909 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
910 ///
911 /// # Example
912 /// ```rust
913 /// # #[cfg(feature = "deploy")] {
914 /// # use hydro_lang::prelude::*;
915 /// # use futures::StreamExt;
916 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
917 /// let tick = process.tick();
918 /// // ticks are lazy by default, forces the second tick to run
919 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
920 ///
921 /// let some_first_tick = tick.optional_first_tick(q!(42));
922 /// some_first_tick.is_none().all_ticks()
923 /// # }, |mut stream| async move {
924 /// // [false /* first tick */, true /* second tick */, ...]
925 /// # for w in vec![false, true] {
926 /// # assert_eq!(stream.next().await.unwrap(), w);
927 /// # }
928 /// # }));
929 /// # }
930 /// ```
931 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
932 pub fn is_none(self) -> Singleton<bool, L, B> {
933 self.map(q!(|_| ()))
934 .into_singleton()
935 .map(q!(|o| o.is_none()))
936 }
937
938 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
939 /// values are equal, `false` otherwise (including when either is null).
940 ///
941 /// # Example
942 /// ```rust
943 /// # #[cfg(feature = "deploy")] {
944 /// # use hydro_lang::prelude::*;
945 /// # use futures::StreamExt;
946 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
947 /// let tick = process.tick();
948 /// // ticks are lazy by default, forces the second tick to run
949 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
950 ///
951 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
952 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
953 /// a.is_some_and_equals(b).all_ticks()
954 /// # }, |mut stream| async move {
955 /// // [true, false]
956 /// # for w in vec![true, false] {
957 /// # assert_eq!(stream.next().await.unwrap(), w);
958 /// # }
959 /// # }));
960 /// # }
961 /// ```
962 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
963 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
964 where
965 T: PartialEq + Clone,
966 B: IsBounded,
967 {
968 self.into_singleton()
969 .zip(other.into_singleton())
970 .map(q!(|(a, b)| a.is_some() && a == b))
971 }
972
973 /// An operator which allows you to "name" a `HydroNode`.
974 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
975 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
976 {
977 let mut node = self.ir_node.borrow_mut();
978 let metadata = node.metadata_mut();
979 metadata.tag = Some(name.to_owned());
980 }
981 self
982 }
983
984 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
985 /// implies that `B == Bounded`.
986 pub fn make_bounded(self) -> Optional<T, L, Bounded>
987 where
988 B: IsBounded,
989 {
990 Optional::new(
991 self.location.clone(),
992 self.ir_node.replace(HydroNode::Placeholder),
993 )
994 }
995
996 /// Clones this bounded optional into a tick, returning a optional that has the
997 /// same value as the outer optional. Because the outer optional is bounded, this
998 /// is deterministic because there is only a single immutable version.
999 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
1000 where
1001 B: IsBounded,
1002 T: Clone,
1003 {
1004 // TODO(shadaj): avoid printing simulator logs for this snapshot
1005 let inner = self.snapshot(
1006 tick,
1007 nondet!(/** bounded top-level optional so deterministic */),
1008 );
1009 Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1010 }
1011
1012 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1013 /// non-null. Otherwise, the stream is empty.
1014 ///
1015 /// # Example
1016 /// ```rust
1017 /// # #[cfg(feature = "deploy")] {
1018 /// # use hydro_lang::prelude::*;
1019 /// # use futures::StreamExt;
1020 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1021 /// # let tick = process.tick();
1022 /// # // ticks are lazy by default, forces the second tick to run
1023 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1024 /// # let batch_first_tick = process
1025 /// # .source_iter(q!(vec![]))
1026 /// # .batch(&tick, nondet!(/** test */));
1027 /// # let batch_second_tick = process
1028 /// # .source_iter(q!(vec![123, 456]))
1029 /// # .batch(&tick, nondet!(/** test */))
1030 /// # .defer_tick(); // appears on the second tick
1031 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1032 /// input_batch // first tick: [], second tick: [123, 456]
1033 /// .clone()
1034 /// .max()
1035 /// .into_stream()
1036 /// .chain(input_batch)
1037 /// .all_ticks()
1038 /// # }, |mut stream| async move {
1039 /// // [456, 123, 456]
1040 /// # for w in vec![456, 123, 456] {
1041 /// # assert_eq!(stream.next().await.unwrap(), w);
1042 /// # }
1043 /// # }));
1044 /// # }
1045 /// ```
1046 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1047 where
1048 B: IsBounded,
1049 {
1050 Stream::new(
1051 self.location.clone(),
1052 HydroNode::Cast {
1053 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1054 metadata: self.location.new_node_metadata(Stream::<
1055 T,
1056 Tick<L>,
1057 Bounded,
1058 TotalOrder,
1059 ExactlyOnce,
1060 >::collection_kind()),
1061 },
1062 )
1063 }
1064
1065 /// Filters this optional, passing through the value if the boolean signal is `true`,
1066 /// otherwise the output is null.
1067 ///
1068 /// # Example
1069 /// ```rust
1070 /// # #[cfg(feature = "deploy")] {
1071 /// # use hydro_lang::prelude::*;
1072 /// # use futures::StreamExt;
1073 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1074 /// let tick = process.tick();
1075 /// // ticks are lazy by default, forces the second tick to run
1076 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1077 ///
1078 /// let some_first_tick = tick.optional_first_tick(q!(()));
1079 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1080 /// let batch_first_tick = process
1081 /// .source_iter(q!(vec![456]))
1082 /// .batch(&tick, nondet!(/** test */));
1083 /// let batch_second_tick = process
1084 /// .source_iter(q!(vec![789]))
1085 /// .batch(&tick, nondet!(/** test */))
1086 /// .defer_tick();
1087 /// batch_first_tick.chain(batch_second_tick).first()
1088 /// .filter_if(signal)
1089 /// .unwrap_or(tick.singleton(q!(0)))
1090 /// .all_ticks()
1091 /// # }, |mut stream| async move {
1092 /// // [456, 0]
1093 /// # for w in vec![456, 0] {
1094 /// # assert_eq!(stream.next().await.unwrap(), w);
1095 /// # }
1096 /// # }));
1097 /// # }
1098 /// ```
1099 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1100 where
1101 B: IsBounded,
1102 {
1103 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1104 }
1105
1106 /// Filters this optional, passing through the optional value if it is non-null **and** the
1107 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1108 ///
1109 /// Useful for conditionally processing, such as only emitting an optional's value outside
1110 /// a tick if some other condition is satisfied.
1111 ///
1112 /// # Example
1113 /// ```rust
1114 /// # #[cfg(feature = "deploy")] {
1115 /// # use hydro_lang::prelude::*;
1116 /// # use futures::StreamExt;
1117 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118 /// let tick = process.tick();
1119 /// // ticks are lazy by default, forces the second tick to run
1120 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1121 ///
1122 /// let batch_first_tick = process
1123 /// .source_iter(q!(vec![]))
1124 /// .batch(&tick, nondet!(/** test */));
1125 /// let batch_second_tick = process
1126 /// .source_iter(q!(vec![456]))
1127 /// .batch(&tick, nondet!(/** test */))
1128 /// .defer_tick(); // appears on the second tick
1129 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1130 /// batch_first_tick.chain(batch_second_tick).first()
1131 /// .filter_if_some(some_on_first_tick)
1132 /// .unwrap_or(tick.singleton(q!(789)))
1133 /// .all_ticks()
1134 /// # }, |mut stream| async move {
1135 /// // [789, 789]
1136 /// # for w in vec![789, 789] {
1137 /// # assert_eq!(stream.next().await.unwrap(), w);
1138 /// # }
1139 /// # }));
1140 /// # }
1141 /// ```
1142 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1143 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1144 where
1145 B: IsBounded,
1146 {
1147 self.filter_if(signal.is_some())
1148 }
1149
1150 /// Filters this optional, passing through the optional value if it is non-null **and** the
1151 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1152 ///
1153 /// Useful for conditionally processing, such as only emitting an optional's value outside
1154 /// a tick if some other condition is satisfied.
1155 ///
1156 /// # Example
1157 /// ```rust
1158 /// # #[cfg(feature = "deploy")] {
1159 /// # use hydro_lang::prelude::*;
1160 /// # use futures::StreamExt;
1161 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1162 /// let tick = process.tick();
1163 /// // ticks are lazy by default, forces the second tick to run
1164 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1165 ///
1166 /// let batch_first_tick = process
1167 /// .source_iter(q!(vec![]))
1168 /// .batch(&tick, nondet!(/** test */));
1169 /// let batch_second_tick = process
1170 /// .source_iter(q!(vec![456]))
1171 /// .batch(&tick, nondet!(/** test */))
1172 /// .defer_tick(); // appears on the second tick
1173 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1174 /// batch_first_tick.chain(batch_second_tick).first()
1175 /// .filter_if_none(some_on_first_tick)
1176 /// .unwrap_or(tick.singleton(q!(789)))
1177 /// .all_ticks()
1178 /// # }, |mut stream| async move {
1179 /// // [789, 789]
1180 /// # for w in vec![789, 456] {
1181 /// # assert_eq!(stream.next().await.unwrap(), w);
1182 /// # }
1183 /// # }));
1184 /// # }
1185 /// ```
1186 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1187 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1188 where
1189 B: IsBounded,
1190 {
1191 self.filter_if(other.is_none())
1192 }
1193
1194 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1195 ///
1196 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1197 /// having a value, such as only releasing a piece of state if the node is the leader.
1198 ///
1199 /// # Example
1200 /// ```rust
1201 /// # #[cfg(feature = "deploy")] {
1202 /// # use hydro_lang::prelude::*;
1203 /// # use futures::StreamExt;
1204 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1205 /// let tick = process.tick();
1206 /// // ticks are lazy by default, forces the second tick to run
1207 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1208 ///
1209 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1210 /// some_on_first_tick
1211 /// .if_some_then(tick.singleton(q!(456)))
1212 /// .unwrap_or(tick.singleton(q!(123)))
1213 /// # .all_ticks()
1214 /// # }, |mut stream| async move {
1215 /// // 456 (first tick) ~> 123 (second tick onwards)
1216 /// # for w in vec![456, 123, 123] {
1217 /// # assert_eq!(stream.next().await.unwrap(), w);
1218 /// # }
1219 /// # }));
1220 /// # }
1221 /// ```
1222 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1223 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1224 where
1225 B: IsBounded,
1226 {
1227 value.filter_if(self.is_some())
1228 }
1229}
1230
1231impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1232where
1233 L: Location<'a>,
1234{
1235 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1236 /// key-value pair of this [`Optional`].
1237 ///
1238 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1239 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1240 /// the entry will be updated and appear / disappear according to the state of the
1241 /// [`Optional`].
1242 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1243 KeyedSingleton::new(
1244 self.location.clone(),
1245 HydroNode::Cast {
1246 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247 metadata: self
1248 .location
1249 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1250 },
1251 )
1252 }
1253}
1254
1255impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1256where
1257 L: Location<'a>,
1258{
1259 /// Returns an optional value corresponding to the latest snapshot of the optional
1260 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1261 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1262 /// all snapshots of this optional into the atomic-associated tick will observe the
1263 /// same value each tick.
1264 ///
1265 /// # Non-Determinism
1266 /// Because this picks a snapshot of a optional whose value is continuously changing,
1267 /// the output optional has a non-deterministic value since the snapshot can be at an
1268 /// arbitrary point in time.
1269 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1270 self,
1271 tick: &Tick<L2>,
1272 _nondet: NonDet,
1273 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1274 Optional::new(
1275 tick.drop_consistency(),
1276 HydroNode::Batch {
1277 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1278 metadata: tick
1279 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1280 },
1281 )
1282 }
1283
1284 /// Returns this optional back into a top-level, asynchronous execution context where updates
1285 /// to the value will be asynchronously propagated.
1286 pub fn end_atomic(self) -> Optional<T, L, B> {
1287 Optional::new(
1288 self.location.tick.l.clone(),
1289 HydroNode::EndAtomic {
1290 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1291 metadata: self
1292 .location
1293 .tick
1294 .l
1295 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1296 },
1297 )
1298 }
1299}
1300
1301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1302where
1303 L: Location<'a>,
1304{
1305 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1306 /// will observe the same version of the value and will be executed synchronously before any
1307 /// outputs are yielded (in [`Optional::end_atomic`]).
1308 ///
1309 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1310 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1311 /// a different version).
1312 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1313 let id = self.location.flow_state().borrow_mut().next_clock_id();
1314 let out_location = Atomic {
1315 tick: Tick {
1316 id,
1317 l: self.location.clone(),
1318 },
1319 };
1320 Optional::new(
1321 out_location.clone(),
1322 HydroNode::BeginAtomic {
1323 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1324 metadata: out_location
1325 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1326 },
1327 )
1328 }
1329
1330 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1331 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1332 /// relevant data that contributed to the snapshot at tick `t`.
1333 ///
1334 /// # Non-Determinism
1335 /// Because this picks a snapshot of a optional whose value is continuously changing,
1336 /// the output optional has a non-deterministic value since the snapshot can be at an
1337 /// arbitrary point in time.
1338 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1339 self,
1340 tick: &Tick<L2>,
1341 _nondet: NonDet,
1342 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1343 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1344 Optional::new(
1345 tick.drop_consistency(),
1346 HydroNode::Batch {
1347 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1348 metadata: tick
1349 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1350 },
1351 )
1352 }
1353
1354 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1355 /// with order corresponding to increasing prefixes of data contributing to the optional.
1356 ///
1357 /// # Non-Determinism
1358 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1359 /// to non-deterministic batching and arrival of inputs, the output stream is
1360 /// non-deterministic.
1361 pub fn sample_eager(
1362 self,
1363 nondet: NonDet,
1364 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1365 let tick = self.location.tick();
1366 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1367 }
1368
1369 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1370 /// value taken at various points in time. Because the input optional may be
1371 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1372 /// represent the value of the optional given some prefix of the streams leading up to
1373 /// it.
1374 ///
1375 /// # Non-Determinism
1376 /// The output stream is non-deterministic in which elements are sampled, since this
1377 /// is controlled by a clock.
1378 pub fn sample_every(
1379 self,
1380 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1381 nondet: NonDet,
1382 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1383 where
1384 L: TopLevel<'a>,
1385 {
1386 let samples = self.location.source_interval(interval);
1387 let tick = self.location.tick();
1388
1389 self.snapshot(&tick, nondet)
1390 .filter_if(samples.batch(&tick, nondet).first().is_some())
1391 .all_ticks()
1392 .weaken_retries()
1393 }
1394}
1395
1396impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1397where
1398 L: Location<'a>,
1399{
1400 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1401 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1402 /// null values).
1403 ///
1404 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1405 /// producing one element in the output for each (non-null) tick. This is useful for batched
1406 /// computations, where the results from each tick must be combined together.
1407 ///
1408 /// # Example
1409 /// ```rust
1410 /// # #[cfg(feature = "deploy")] {
1411 /// # use hydro_lang::prelude::*;
1412 /// # use futures::StreamExt;
1413 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1414 /// # let tick = process.tick();
1415 /// # // ticks are lazy by default, forces the second tick to run
1416 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1417 /// # let batch_first_tick = process
1418 /// # .source_iter(q!(vec![]))
1419 /// # .batch(&tick, nondet!(/** test */));
1420 /// # let batch_second_tick = process
1421 /// # .source_iter(q!(vec![1, 2, 3]))
1422 /// # .batch(&tick, nondet!(/** test */))
1423 /// # .defer_tick(); // appears on the second tick
1424 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1425 /// input_batch // first tick: [], second tick: [1, 2, 3]
1426 /// .max()
1427 /// .all_ticks()
1428 /// # }, |mut stream| async move {
1429 /// // [3]
1430 /// # for w in vec![3] {
1431 /// # assert_eq!(stream.next().await.unwrap(), w);
1432 /// # }
1433 /// # }));
1434 /// # }
1435 /// ```
1436 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1437 self.into_stream().all_ticks()
1438 }
1439
1440 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1441 /// which will stream the value computed in _each_ tick as a separate stream element.
1442 ///
1443 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1444 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1445 /// optional's [`Tick`] context.
1446 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1447 self.into_stream().all_ticks_atomic()
1448 }
1449
1450 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1451 /// be asynchronously updated with the latest value of the optional inside the tick, including
1452 /// whether the optional is null or not.
1453 ///
1454 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1455 /// tick that tracks the inner value. This is useful for getting the value as of the
1456 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1457 ///
1458 /// # Example
1459 /// ```rust
1460 /// # #[cfg(feature = "deploy")] {
1461 /// # use hydro_lang::prelude::*;
1462 /// # use futures::StreamExt;
1463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1464 /// # let tick = process.tick();
1465 /// # // ticks are lazy by default, forces the second tick to run
1466 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1467 /// # let batch_first_tick = process
1468 /// # .source_iter(q!(vec![]))
1469 /// # .batch(&tick, nondet!(/** test */));
1470 /// # let batch_second_tick = process
1471 /// # .source_iter(q!(vec![1, 2, 3]))
1472 /// # .batch(&tick, nondet!(/** test */))
1473 /// # .defer_tick(); // appears on the second tick
1474 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1475 /// input_batch // first tick: [], second tick: [1, 2, 3]
1476 /// .max()
1477 /// .latest()
1478 /// # .into_singleton()
1479 /// # .sample_eager(nondet!(/** test */))
1480 /// # }, |mut stream| async move {
1481 /// // asynchronously changes from None ~> 3
1482 /// # for w in vec![None, Some(3)] {
1483 /// # assert_eq!(stream.next().await.unwrap(), w);
1484 /// # }
1485 /// # }));
1486 /// # }
1487 /// ```
1488 pub fn latest(self) -> Optional<T, L, Unbounded> {
1489 Optional::new(
1490 self.location.outer().clone(),
1491 HydroNode::YieldConcat {
1492 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1493 metadata: self
1494 .location
1495 .outer()
1496 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1497 },
1498 )
1499 }
1500
1501 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1502 /// be updated with the latest value of the optional inside the tick.
1503 ///
1504 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1505 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1506 /// optional's [`Tick`] context.
1507 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1508 let out_location = Atomic {
1509 tick: self.location.clone(),
1510 };
1511
1512 Optional::new(
1513 out_location.clone(),
1514 HydroNode::YieldConcat {
1515 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1516 metadata: out_location
1517 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1518 },
1519 )
1520 }
1521
1522 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1523 /// always has the state of `self` at tick `T - 1`.
1524 ///
1525 /// At tick `0`, the output optional is null, since there is no previous tick.
1526 ///
1527 /// This operator enables stateful iterative processing with ticks, by sending data from one
1528 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1529 ///
1530 /// # Example
1531 /// ```rust
1532 /// # #[cfg(feature = "deploy")] {
1533 /// # use hydro_lang::prelude::*;
1534 /// # use futures::StreamExt;
1535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1536 /// let tick = process.tick();
1537 /// // ticks are lazy by default, forces the second tick to run
1538 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1539 ///
1540 /// let batch_first_tick = process
1541 /// .source_iter(q!(vec![1, 2]))
1542 /// .batch(&tick, nondet!(/** test */));
1543 /// let batch_second_tick = process
1544 /// .source_iter(q!(vec![3, 4]))
1545 /// .batch(&tick, nondet!(/** test */))
1546 /// .defer_tick(); // appears on the second tick
1547 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1548 /// .reduce(q!(|state, v| *state += v));
1549 ///
1550 /// current_tick_sum.clone().into_singleton().zip(
1551 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1552 /// ).all_ticks()
1553 /// # }, |mut stream| async move {
1554 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1555 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1556 /// # assert_eq!(stream.next().await.unwrap(), w);
1557 /// # }
1558 /// # }));
1559 /// # }
1560 /// ```
1561 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1562 Optional::new(
1563 self.location.clone(),
1564 HydroNode::DeferTick {
1565 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1566 metadata: self.location.new_node_metadata(Self::collection_kind()),
1567 },
1568 )
1569 }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574 #[cfg(feature = "deploy")]
1575 use futures::StreamExt;
1576 #[cfg(feature = "deploy")]
1577 use hydro_deploy::Deployment;
1578 #[cfg(any(feature = "deploy", feature = "sim"))]
1579 use stageleft::q;
1580
1581 #[cfg(feature = "deploy")]
1582 use super::Optional;
1583 #[cfg(any(feature = "deploy", feature = "sim"))]
1584 use crate::compile::builder::FlowBuilder;
1585 #[cfg(any(feature = "deploy", feature = "sim"))]
1586 use crate::location::Location;
1587 #[cfg(feature = "deploy")]
1588 use crate::nondet::nondet;
1589
1590 #[cfg(feature = "deploy")]
1591 #[tokio::test]
1592 async fn optional_or_cardinality() {
1593 let mut deployment = Deployment::new();
1594
1595 let mut flow = FlowBuilder::new();
1596 let node = flow.process::<()>();
1597 let external = flow.external::<()>();
1598
1599 let node_tick = node.tick();
1600 let tick_singleton = node_tick.singleton(q!(123));
1601 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1602 let counts = tick_optional_inhabited
1603 .clone()
1604 .or(tick_optional_inhabited)
1605 .into_stream()
1606 .count()
1607 .all_ticks()
1608 .send_bincode_external(&external);
1609
1610 let nodes = flow
1611 .with_process(&node, deployment.Localhost())
1612 .with_external(&external, deployment.Localhost())
1613 .deploy(&mut deployment);
1614
1615 deployment.deploy().await.unwrap();
1616
1617 let mut external_out = nodes.connect(counts).await;
1618
1619 deployment.start().await.unwrap();
1620
1621 assert_eq!(external_out.next().await.unwrap(), 1);
1622 }
1623
1624 #[cfg(feature = "deploy")]
1625 #[tokio::test]
1626 async fn into_singleton_top_level_none_cardinality() {
1627 let mut deployment = Deployment::new();
1628
1629 let mut flow = FlowBuilder::new();
1630 let node = flow.process::<()>();
1631 let external = flow.external::<()>();
1632
1633 let node_tick = node.tick();
1634 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1635 let into_singleton = top_level_none.into_singleton();
1636
1637 let tick_driver = node.spin();
1638
1639 let counts = into_singleton
1640 .snapshot(&node_tick, nondet!(/** test */))
1641 .into_stream()
1642 .count()
1643 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1644 .map(q!(|(c, _)| c))
1645 .all_ticks()
1646 .send_bincode_external(&external);
1647
1648 let nodes = flow
1649 .with_process(&node, deployment.Localhost())
1650 .with_external(&external, deployment.Localhost())
1651 .deploy(&mut deployment);
1652
1653 deployment.deploy().await.unwrap();
1654
1655 let mut external_out = nodes.connect(counts).await;
1656
1657 deployment.start().await.unwrap();
1658
1659 assert_eq!(external_out.next().await.unwrap(), 1);
1660 assert_eq!(external_out.next().await.unwrap(), 1);
1661 assert_eq!(external_out.next().await.unwrap(), 1);
1662 }
1663
1664 #[cfg(feature = "deploy")]
1665 #[tokio::test]
1666 async fn into_singleton_unbounded_top_level_none_cardinality() {
1667 let mut deployment = Deployment::new();
1668
1669 let mut flow = FlowBuilder::new();
1670 let node = flow.process::<()>();
1671 let external = flow.external::<()>();
1672
1673 let node_tick = node.tick();
1674 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1675 let into_singleton = top_level_none.into_singleton();
1676
1677 let tick_driver = node.spin();
1678
1679 let counts = into_singleton
1680 .snapshot(&node_tick, nondet!(/** test */))
1681 .into_stream()
1682 .count()
1683 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1684 .map(q!(|(c, _)| c))
1685 .all_ticks()
1686 .send_bincode_external(&external);
1687
1688 let nodes = flow
1689 .with_process(&node, deployment.Localhost())
1690 .with_external(&external, deployment.Localhost())
1691 .deploy(&mut deployment);
1692
1693 deployment.deploy().await.unwrap();
1694
1695 let mut external_out = nodes.connect(counts).await;
1696
1697 deployment.start().await.unwrap();
1698
1699 assert_eq!(external_out.next().await.unwrap(), 1);
1700 assert_eq!(external_out.next().await.unwrap(), 1);
1701 assert_eq!(external_out.next().await.unwrap(), 1);
1702 }
1703
1704 #[cfg(feature = "sim")]
1705 #[test]
1706 fn top_level_optional_some_into_stream_no_replay() {
1707 let mut flow = FlowBuilder::new();
1708 let node = flow.process::<()>();
1709
1710 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1711 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1712 let filtered_some = folded.filter(q!(|_| true));
1713
1714 let out_recv = filtered_some.into_stream().sim_output();
1715
1716 flow.sim().exhaustive(async || {
1717 out_recv.assert_yields_only([10]).await;
1718 });
1719 }
1720
1721 #[cfg(feature = "sim")]
1722 #[test]
1723 fn top_level_optional_none_into_stream_no_replay() {
1724 let mut flow = FlowBuilder::new();
1725 let node = flow.process::<()>();
1726
1727 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1728 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1729 let filtered_none = folded.filter(q!(|_| false));
1730
1731 let out_recv = filtered_none.into_stream().sim_output();
1732
1733 flow.sim().exhaustive(async || {
1734 out_recv.assert_yields_only([] as [i32; 0]).await;
1735 });
1736 }
1737
1738 #[cfg(feature = "deploy")]
1739 #[tokio::test]
1740 async fn test_optional_ref() {
1741 let mut deployment = Deployment::new();
1742
1743 let mut flow = FlowBuilder::new();
1744 let external = flow.external::<()>();
1745 let p1 = flow.process::<()>();
1746
1747 // Create an optional: reduce 0..5 => Some(10) (sum via reduce)
1748 let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1749
1750 let opt_ref = my_opt.by_ref();
1751
1752 // Use the optional ref in a map: unwrap_or(0) + x
1753 let out_port = p1
1754 .source_iter(q!(1..=3i32))
1755 .map(q!(|x| x + opt_ref.unwrap_or(0)))
1756 .send_bincode_external(&external);
1757
1758 let nodes = flow
1759 .with_default_optimize()
1760 .with_process(&p1, deployment.Localhost())
1761 .with_external(&external, deployment.Localhost())
1762 .deploy(&mut deployment);
1763
1764 deployment.deploy().await.unwrap();
1765
1766 let mut out_recv = nodes.connect(out_port).await;
1767
1768 deployment.start().await.unwrap();
1769
1770 let mut results = Vec::new();
1771 for _ in 0..3 {
1772 results.push(out_recv.next().await.unwrap());
1773 }
1774 results.sort();
1775 // reduce(0..5) = 10, so results should be 11, 12, 13
1776 assert_eq!(results, vec![11, 12, 13]);
1777 }
1778
1779 #[cfg(feature = "deploy")]
1780 #[tokio::test]
1781 async fn test_optional_ref_none() {
1782 let mut deployment = Deployment::new();
1783
1784 let mut flow = FlowBuilder::new();
1785 let external = flow.external::<()>();
1786 let p1 = flow.process::<()>();
1787
1788 // Create an optional from an empty source => None
1789 let my_opt = p1
1790 .source_iter(q!(std::iter::empty::<i32>()))
1791 .reduce(q!(|a, b| *a += b));
1792
1793 let opt_ref = my_opt.by_ref();
1794
1795 // Use the optional ref: should be None, so unwrap_or(99)
1796 let out_port = p1
1797 .source_iter(q!(1..=2i32))
1798 .map(q!(|x| x + opt_ref.unwrap_or(99)))
1799 .send_bincode_external(&external);
1800
1801 let nodes = flow
1802 .with_default_optimize()
1803 .with_process(&p1, deployment.Localhost())
1804 .with_external(&external, deployment.Localhost())
1805 .deploy(&mut deployment);
1806
1807 deployment.deploy().await.unwrap();
1808
1809 let mut out_recv = nodes.connect(out_port).await;
1810
1811 deployment.start().await.unwrap();
1812
1813 let mut results = Vec::new();
1814 for _ in 0..2 {
1815 results.push(out_recv.next().await.unwrap());
1816 }
1817 results.sort();
1818 // optional is None, so unwrap_or(99) => 100, 101
1819 assert_eq!(results, vec![100, 101]);
1820 }
1821
1822 #[cfg(feature = "deploy")]
1823 #[tokio::test]
1824 async fn test_optional_ref_and_consume() {
1825 let mut deployment = Deployment::new();
1826
1827 let mut flow = FlowBuilder::new();
1828 let external = flow.external::<()>();
1829 let p1 = flow.process::<()>();
1830
1831 // Use reduce to produce an Optional
1832 let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1833
1834 let opt_ref = my_opt.by_ref();
1835
1836 // Reference path
1837 let out_port_ref = p1
1838 .source_iter(q!(1..=2i32))
1839 .map(q!(|x| x + opt_ref.unwrap_or(0)))
1840 .send_bincode_external(&external);
1841
1842 let nodes = flow
1843 .with_default_optimize()
1844 .with_process(&p1, deployment.Localhost())
1845 .with_external(&external, deployment.Localhost())
1846 .deploy(&mut deployment);
1847
1848 deployment.deploy().await.unwrap();
1849
1850 let mut out_recv_ref = nodes.connect(out_port_ref).await;
1851
1852 deployment.start().await.unwrap();
1853
1854 let mut ref_results = Vec::new();
1855 for _ in 0..2 {
1856 ref_results.push(out_recv_ref.next().await.unwrap());
1857 }
1858 ref_results.sort();
1859 // reduce(0..5) = 10, so 1+10=11, 2+10=12
1860 assert_eq!(ref_results, vec![11, 12]);
1861 }
1862}