hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::NonDet;
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46 L: Location<'a>,
47{
48 fn defer_tick(self) -> Self {
49 Optional::defer_tick(self)
50 }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60 Optional::new(
61 location.clone(),
62 HydroNode::CycleSource {
63 ident,
64 metadata: location.new_node_metadata::<T>(),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 Location::id(&self.location),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .push_root(HydroRoot::CycleSink {
84 ident,
85 input: Box::new(self.ir_node.into_inner()),
86 out_location: Location::id(&self.location),
87 op_metadata: HydroIrOpMetadata::new(),
88 });
89 }
90}
91
92impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
93where
94 L: Location<'a>,
95{
96 type Location = Tick<L>;
97
98 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
99 Optional::new(
100 location.clone(),
101 HydroNode::CycleSource {
102 ident,
103 metadata: location.new_node_metadata::<T>(),
104 },
105 )
106 }
107}
108
109impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
110where
111 L: Location<'a>,
112{
113 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
114 assert_eq!(
115 Location::id(&self.location),
116 expected_location,
117 "locations do not match"
118 );
119 self.location
120 .flow_state()
121 .borrow_mut()
122 .push_root(HydroRoot::CycleSink {
123 ident,
124 input: Box::new(self.ir_node.into_inner()),
125 out_location: Location::id(&self.location),
126 op_metadata: HydroIrOpMetadata::new(),
127 });
128 }
129}
130
131impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
132where
133 L: Location<'a> + NoTick,
134{
135 type Location = L;
136
137 fn create_source(ident: syn::Ident, location: L) -> Self {
138 Optional::new(
139 location.clone(),
140 HydroNode::Persist {
141 inner: Box::new(HydroNode::CycleSource {
142 ident,
143 metadata: location.new_node_metadata::<T>(),
144 }),
145 metadata: location.new_node_metadata::<T>(),
146 },
147 )
148 }
149}
150
151impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
152where
153 L: Location<'a> + NoTick,
154{
155 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
156 assert_eq!(
157 Location::id(&self.location),
158 expected_location,
159 "locations do not match"
160 );
161 let metadata = self.location.new_node_metadata::<T>();
162 self.location
163 .flow_state()
164 .borrow_mut()
165 .push_root(HydroRoot::CycleSink {
166 ident,
167 input: Box::new(HydroNode::Unpersist {
168 inner: Box::new(self.ir_node.into_inner()),
169 metadata: metadata.clone(),
170 }),
171 out_location: Location::id(&self.location),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175}
176
177impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
178where
179 L: Location<'a>,
180{
181 fn from(singleton: Optional<T, L, Bounded>) -> Self {
182 Optional::new(singleton.location, singleton.ir_node.into_inner())
183 }
184}
185
186impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
187where
188 L: Location<'a>,
189{
190 fn from(singleton: Singleton<T, L, B>) -> Self {
191 Optional::new(singleton.location, singleton.ir_node.into_inner())
192 }
193}
194
195impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
196where
197 T: Clone,
198 L: Location<'a>,
199{
200 fn clone(&self) -> Self {
201 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
202 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
203 *self.ir_node.borrow_mut() = HydroNode::Tee {
204 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
205 metadata: self.location.new_node_metadata::<T>(),
206 };
207 }
208
209 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
210 Optional {
211 location: self.location.clone(),
212 ir_node: HydroNode::Tee {
213 inner: TeeNode(inner.0.clone()),
214 metadata: metadata.clone(),
215 }
216 .into(),
217 _phantom: PhantomData,
218 }
219 } else {
220 unreachable!()
221 }
222 }
223}
224
225impl<'a, T, L, B: Boundedness> Optional<T, L, B>
226where
227 L: Location<'a>,
228{
229 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
230 Optional {
231 location,
232 ir_node: RefCell::new(ir_node),
233 _phantom: PhantomData,
234 }
235 }
236
237 /// Transforms the optional value by applying a function `f` to it,
238 /// continuously as the input is updated.
239 ///
240 /// Whenever the optional is empty, the output optional is also empty.
241 ///
242 /// # Example
243 /// ```rust
244 /// # use hydro_lang::prelude::*;
245 /// # use futures::StreamExt;
246 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
247 /// let tick = process.tick();
248 /// let optional = tick.optional_first_tick(q!(1));
249 /// optional.map(q!(|v| v + 1)).all_ticks()
250 /// # }, |mut stream| async move {
251 /// // 2
252 /// # assert_eq!(stream.next().await.unwrap(), 2);
253 /// # }));
254 /// ```
255 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
256 where
257 F: Fn(T) -> U + 'a,
258 {
259 let f = f.splice_fn1_ctx(&self.location).into();
260 Optional::new(
261 self.location.clone(),
262 HydroNode::Map {
263 f,
264 input: Box::new(self.ir_node.into_inner()),
265 metadata: self.location.new_node_metadata::<U>(),
266 },
267 )
268 }
269
270 /// Transforms the optional value by applying a function `f` to it and then flattening
271 /// the result into a stream, preserving the order of elements.
272 ///
273 /// If the optional is empty, the output stream is also empty. If the optional contains
274 /// a value, `f` is applied to produce an iterator, and all items from that iterator
275 /// are emitted in the output stream in deterministic order.
276 ///
277 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
278 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
279 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
280 ///
281 /// # Example
282 /// ```rust
283 /// # use hydro_lang::prelude::*;
284 /// # use futures::StreamExt;
285 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
286 /// let tick = process.tick();
287 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
288 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
289 /// # }, |mut stream| async move {
290 /// // 1, 2, 3
291 /// # for w in vec![1, 2, 3] {
292 /// # assert_eq!(stream.next().await.unwrap(), w);
293 /// # }
294 /// # }));
295 /// ```
296 pub fn flat_map_ordered<U, I, F>(
297 self,
298 f: impl IntoQuotedMut<'a, F, L>,
299 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
300 where
301 I: IntoIterator<Item = U>,
302 F: Fn(T) -> I + 'a,
303 {
304 let f = f.splice_fn1_ctx(&self.location).into();
305 Stream::new(
306 self.location.clone(),
307 HydroNode::FlatMap {
308 f,
309 input: Box::new(self.ir_node.into_inner()),
310 metadata: self.location.new_node_metadata::<U>(),
311 },
312 )
313 }
314
315 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
316 /// for the output type `I` to produce items in any order.
317 ///
318 /// If the optional is empty, the output stream is also empty. If the optional contains
319 /// a value, `f` is applied to produce an iterator, and all items from that iterator
320 /// are emitted in the output stream in non-deterministic order.
321 ///
322 /// # Example
323 /// ```rust
324 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
325 /// # use futures::StreamExt;
326 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
327 /// let tick = process.tick();
328 /// let optional = tick.optional_first_tick(q!(
329 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
330 /// ));
331 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
332 /// # }, |mut stream| async move {
333 /// // 1, 2, 3, but in no particular order
334 /// # let mut results = Vec::new();
335 /// # for _ in 0..3 {
336 /// # results.push(stream.next().await.unwrap());
337 /// # }
338 /// # results.sort();
339 /// # assert_eq!(results, vec![1, 2, 3]);
340 /// # }));
341 /// ```
342 pub fn flat_map_unordered<U, I, F>(
343 self,
344 f: impl IntoQuotedMut<'a, F, L>,
345 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
346 where
347 I: IntoIterator<Item = U>,
348 F: Fn(T) -> I + 'a,
349 {
350 let f = f.splice_fn1_ctx(&self.location).into();
351 Stream::new(
352 self.location.clone(),
353 HydroNode::FlatMap {
354 f,
355 input: Box::new(self.ir_node.into_inner()),
356 metadata: self.location.new_node_metadata::<U>(),
357 },
358 )
359 }
360
361 /// Flattens the optional value into a stream, preserving the order of elements.
362 ///
363 /// If the optional is empty, the output stream is also empty. If the optional contains
364 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
365 /// in the output stream in deterministic order.
366 ///
367 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
368 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
369 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
370 ///
371 /// # Example
372 /// ```rust
373 /// # use hydro_lang::prelude::*;
374 /// # use futures::StreamExt;
375 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376 /// let tick = process.tick();
377 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
378 /// optional.flatten_ordered().all_ticks()
379 /// # }, |mut stream| async move {
380 /// // 1, 2, 3
381 /// # for w in vec![1, 2, 3] {
382 /// # assert_eq!(stream.next().await.unwrap(), w);
383 /// # }
384 /// # }));
385 /// ```
386 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
387 where
388 T: IntoIterator<Item = U>,
389 {
390 self.flat_map_ordered(q!(|v| v))
391 }
392
393 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
394 /// for the element type `T` to produce items in any order.
395 ///
396 /// If the optional is empty, the output stream is also empty. If the optional contains
397 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
398 /// in the output stream in non-deterministic order.
399 ///
400 /// # Example
401 /// ```rust
402 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
403 /// # use futures::StreamExt;
404 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
405 /// let tick = process.tick();
406 /// let optional = tick.optional_first_tick(q!(
407 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
408 /// ));
409 /// optional.flatten_unordered().all_ticks()
410 /// # }, |mut stream| async move {
411 /// // 1, 2, 3, but in no particular order
412 /// # let mut results = Vec::new();
413 /// # for _ in 0..3 {
414 /// # results.push(stream.next().await.unwrap());
415 /// # }
416 /// # results.sort();
417 /// # assert_eq!(results, vec![1, 2, 3]);
418 /// # }));
419 /// ```
420 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
421 where
422 T: IntoIterator<Item = U>,
423 {
424 self.flat_map_unordered(q!(|v| v))
425 }
426
427 /// Creates an optional containing only the value if it satisfies a predicate `f`.
428 ///
429 /// If the optional is empty, the output optional is also empty. If the optional contains
430 /// a value and the predicate returns `true`, the output optional contains the same value.
431 /// If the predicate returns `false`, the output optional is empty.
432 ///
433 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
434 /// not modify or take ownership of the value. If you need to modify the value while filtering
435 /// use [`Optional::filter_map`] instead.
436 ///
437 /// # Example
438 /// ```rust
439 /// # use hydro_lang::prelude::*;
440 /// # use futures::StreamExt;
441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
442 /// let tick = process.tick();
443 /// let optional = tick.optional_first_tick(q!(5));
444 /// optional.filter(q!(|&x| x > 3)).all_ticks()
445 /// # }, |mut stream| async move {
446 /// // 5
447 /// # assert_eq!(stream.next().await.unwrap(), 5);
448 /// # }));
449 /// ```
450 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
451 where
452 F: Fn(&T) -> bool + 'a,
453 {
454 let f = f.splice_fn1_borrow_ctx(&self.location).into();
455 Optional::new(
456 self.location.clone(),
457 HydroNode::Filter {
458 f,
459 input: Box::new(self.ir_node.into_inner()),
460 metadata: self.location.new_node_metadata::<T>(),
461 },
462 )
463 }
464
465 /// An operator that both filters and maps. It yields only the value if the supplied
466 /// closure `f` returns `Some(value)`.
467 ///
468 /// If the optional is empty, the output optional is also empty. If the optional contains
469 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
470 /// If the closure returns `None`, the output optional is empty.
471 ///
472 /// # Example
473 /// ```rust
474 /// # use hydro_lang::prelude::*;
475 /// # use futures::StreamExt;
476 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477 /// let tick = process.tick();
478 /// let optional = tick.optional_first_tick(q!("42"));
479 /// optional
480 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
481 /// .all_ticks()
482 /// # }, |mut stream| async move {
483 /// // 42
484 /// # assert_eq!(stream.next().await.unwrap(), 42);
485 /// # }));
486 /// ```
487 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
488 where
489 F: Fn(T) -> Option<U> + 'a,
490 {
491 let f = f.splice_fn1_ctx(&self.location).into();
492 Optional::new(
493 self.location.clone(),
494 HydroNode::FilterMap {
495 f,
496 input: Box::new(self.ir_node.into_inner()),
497 metadata: self.location.new_node_metadata::<U>(),
498 },
499 )
500 }
501
502 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
503 ///
504 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
505 /// non-null. This is useful for combining several pieces of state together.
506 ///
507 /// # Example
508 /// ```rust
509 /// # use hydro_lang::prelude::*;
510 /// # use futures::StreamExt;
511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512 /// let tick = process.tick();
513 /// let numbers = process
514 /// .source_iter(q!(vec![123, 456, 789]))
515 /// .batch(&tick, nondet!(/** test */));
516 /// let min = numbers.clone().min(); // Optional
517 /// let max = numbers.max(); // Optional
518 /// min.zip(max).all_ticks()
519 /// # }, |mut stream| async move {
520 /// // [(123, 789)]
521 /// # for w in vec![(123, 789)] {
522 /// # assert_eq!(stream.next().await.unwrap(), w);
523 /// # }
524 /// # }));
525 /// ```
526 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
527 where
528 O: Clone,
529 {
530 let other: Optional<O, L, B> = other.into();
531 check_matching_location(&self.location, &other.location);
532
533 if L::is_top_level() {
534 Optional::new(
535 self.location.clone(),
536 HydroNode::Persist {
537 inner: Box::new(HydroNode::CrossSingleton {
538 left: Box::new(HydroNode::Unpersist {
539 inner: Box::new(self.ir_node.into_inner()),
540 metadata: self.location.new_node_metadata::<T>(),
541 }),
542 right: Box::new(HydroNode::Unpersist {
543 inner: Box::new(other.ir_node.into_inner()),
544 metadata: self.location.new_node_metadata::<O>(),
545 }),
546 metadata: self.location.new_node_metadata::<(T, O)>(),
547 }),
548 metadata: self.location.new_node_metadata::<(T, O)>(),
549 },
550 )
551 } else {
552 Optional::new(
553 self.location.clone(),
554 HydroNode::CrossSingleton {
555 left: Box::new(self.ir_node.into_inner()),
556 right: Box::new(other.ir_node.into_inner()),
557 metadata: self.location.new_node_metadata::<(T, O)>(),
558 },
559 )
560 }
561 }
562
563 /// Passes through `self` when it has a value, otherwise passes through `other`.
564 ///
565 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
566 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
567 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
568 ///
569 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
570 /// of the inputs change (including to/from null states).
571 ///
572 /// # Example
573 /// ```rust
574 /// # use hydro_lang::prelude::*;
575 /// # use futures::StreamExt;
576 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
577 /// let tick = process.tick();
578 /// // ticks are lazy by default, forces the second tick to run
579 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
580 ///
581 /// let some_first_tick = tick.optional_first_tick(q!(123));
582 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
583 /// some_first_tick.or(some_second_tick).all_ticks()
584 /// # }, |mut stream| async move {
585 /// // [123 /* first tick */, 456 /* second tick */]
586 /// # for w in vec![123, 456] {
587 /// # assert_eq!(stream.next().await.unwrap(), w);
588 /// # }
589 /// # }));
590 /// ```
591 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
592 check_matching_location(&self.location, &other.location);
593
594 if L::is_top_level() {
595 Optional::new(
596 self.location.clone(),
597 HydroNode::Persist {
598 inner: Box::new(HydroNode::ChainFirst {
599 first: Box::new(HydroNode::Unpersist {
600 inner: Box::new(self.ir_node.into_inner()),
601 metadata: self.location.new_node_metadata::<T>(),
602 }),
603 second: Box::new(HydroNode::Unpersist {
604 inner: Box::new(other.ir_node.into_inner()),
605 metadata: self.location.new_node_metadata::<T>(),
606 }),
607 metadata: self.location.new_node_metadata::<T>(),
608 }),
609 metadata: self.location.new_node_metadata::<T>(),
610 },
611 )
612 } else {
613 Optional::new(
614 self.location.clone(),
615 HydroNode::ChainFirst {
616 first: Box::new(self.ir_node.into_inner()),
617 second: Box::new(other.ir_node.into_inner()),
618 metadata: self.location.new_node_metadata::<T>(),
619 },
620 )
621 }
622 }
623
624 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
625 ///
626 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
627 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
628 ///
629 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
630 /// of the inputs change (including to/from null states).
631 ///
632 /// # Example
633 /// ```rust
634 /// # use hydro_lang::prelude::*;
635 /// # use futures::StreamExt;
636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637 /// let tick = process.tick();
638 /// // ticks are lazy by default, forces the later ticks to run
639 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640 ///
641 /// let some_first_tick = tick.optional_first_tick(q!(123));
642 /// some_first_tick
643 /// .unwrap_or(tick.singleton(q!(456)))
644 /// .all_ticks()
645 /// # }, |mut stream| async move {
646 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
647 /// # for w in vec![123, 456, 456, 456] {
648 /// # assert_eq!(stream.next().await.unwrap(), w);
649 /// # }
650 /// # }));
651 /// ```
652 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
653 let res_option = self.or(other.into());
654 Singleton::new(res_option.location, res_option.ir_node.into_inner())
655 }
656
657 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
658 ///
659 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
660 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
661 /// so that Hydro can skip any computation on null values.
662 ///
663 /// # Example
664 /// ```rust
665 /// # use hydro_lang::prelude::*;
666 /// # use futures::StreamExt;
667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668 /// let tick = process.tick();
669 /// // ticks are lazy by default, forces the later ticks to run
670 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671 ///
672 /// let some_first_tick = tick.optional_first_tick(q!(123));
673 /// some_first_tick.into_singleton().all_ticks()
674 /// # }, |mut stream| async move {
675 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
676 /// # for w in vec![Some(123), None, None, None] {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// ```
681 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
682 where
683 T: Clone,
684 {
685 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
686 let core_ir = HydroNode::Persist {
687 inner: Box::new(HydroNode::Source {
688 source: HydroSource::Iter(none.into()),
689 metadata: self.location.root().new_node_metadata::<Option<T>>(),
690 }),
691 metadata: self.location.new_node_metadata::<Option<T>>(),
692 };
693
694 let none_singleton = if L::is_top_level() {
695 Singleton::new(
696 self.location.clone(),
697 HydroNode::Persist {
698 inner: Box::new(core_ir),
699 metadata: self.location.new_node_metadata::<Option<T>>(),
700 },
701 )
702 } else {
703 Singleton::new(self.location.clone(), core_ir)
704 };
705
706 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
707 }
708
709 /// An operator which allows you to "name" a `HydroNode`.
710 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
711 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
712 {
713 let mut node = self.ir_node.borrow_mut();
714 let metadata = node.metadata_mut();
715 metadata.tag = Some(name.to_string());
716 }
717 self
718 }
719}
720
721impl<'a, T, L> Optional<T, L, Bounded>
722where
723 L: Location<'a>,
724{
725 /// Filters this optional, passing through the optional value if it is non-null **and** the
726 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
727 ///
728 /// Useful for conditionally processing, such as only emitting an optional's value outside
729 /// a tick if some other condition is satisfied.
730 ///
731 /// # Example
732 /// ```rust
733 /// # use hydro_lang::prelude::*;
734 /// # use futures::StreamExt;
735 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
736 /// let tick = process.tick();
737 /// // ticks are lazy by default, forces the second tick to run
738 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
739 ///
740 /// let batch_first_tick = process
741 /// .source_iter(q!(vec![]))
742 /// .batch(&tick, nondet!(/** test */));
743 /// let batch_second_tick = process
744 /// .source_iter(q!(vec![456]))
745 /// .batch(&tick, nondet!(/** test */))
746 /// .defer_tick(); // appears on the second tick
747 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
748 /// batch_first_tick.chain(batch_second_tick).first()
749 /// .filter_if_some(some_on_first_tick)
750 /// .unwrap_or(tick.singleton(q!(789)))
751 /// .all_ticks()
752 /// # }, |mut stream| async move {
753 /// // [789, 789]
754 /// # for w in vec![789, 789] {
755 /// # assert_eq!(stream.next().await.unwrap(), w);
756 /// # }
757 /// # }));
758 /// ```
759 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
760 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
761 }
762
763 /// Filters this optional, passing through the optional value if it is non-null **and** the
764 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
765 ///
766 /// Useful for conditionally processing, such as only emitting an optional's value outside
767 /// a tick if some other condition is satisfied.
768 ///
769 /// # Example
770 /// ```rust
771 /// # use hydro_lang::prelude::*;
772 /// # use futures::StreamExt;
773 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
774 /// let tick = process.tick();
775 /// // ticks are lazy by default, forces the second tick to run
776 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
777 ///
778 /// let batch_first_tick = process
779 /// .source_iter(q!(vec![]))
780 /// .batch(&tick, nondet!(/** test */));
781 /// let batch_second_tick = process
782 /// .source_iter(q!(vec![456]))
783 /// .batch(&tick, nondet!(/** test */))
784 /// .defer_tick(); // appears on the second tick
785 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
786 /// batch_first_tick.chain(batch_second_tick).first()
787 /// .filter_if_none(some_on_first_tick)
788 /// .unwrap_or(tick.singleton(q!(789)))
789 /// .all_ticks()
790 /// # }, |mut stream| async move {
791 /// // [789, 789]
792 /// # for w in vec![789, 456] {
793 /// # assert_eq!(stream.next().await.unwrap(), w);
794 /// # }
795 /// # }));
796 /// ```
797 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
798 self.filter_if_some(
799 other
800 .map(q!(|_| ()))
801 .into_singleton()
802 .filter(q!(|o| o.is_none())),
803 )
804 }
805
806 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
807 ///
808 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
809 /// having a value, such as only releasing a piece of state if the node is the leader.
810 ///
811 /// # Example
812 /// ```rust
813 /// # use hydro_lang::prelude::*;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// // ticks are lazy by default, forces the second tick to run
818 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
819 ///
820 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
821 /// some_on_first_tick
822 /// .if_some_then(tick.singleton(q!(456)))
823 /// .unwrap_or(tick.singleton(q!(123)))
824 /// # .all_ticks()
825 /// # }, |mut stream| async move {
826 /// // 456 (first tick) ~> 123 (second tick onwards)
827 /// # for w in vec![456, 123, 123] {
828 /// # assert_eq!(stream.next().await.unwrap(), w);
829 /// # }
830 /// # }));
831 /// ```
832 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
833 value.filter_if_some(self)
834 }
835}
836
837impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
838where
839 L: Location<'a> + NoTick,
840{
841 /// Returns an optional value corresponding to the latest snapshot of the optional
842 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
843 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
844 /// all snapshots of this optional into the atomic-associated tick will observe the
845 /// same value each tick.
846 ///
847 /// # Non-Determinism
848 /// Because this picks a snapshot of a optional whose value is continuously changing,
849 /// the output optional has a non-deterministic value since the snapshot can be at an
850 /// arbitrary point in time.
851 pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
852 Optional::new(
853 self.location.clone().tick,
854 HydroNode::Unpersist {
855 inner: Box::new(self.ir_node.into_inner()),
856 metadata: self.location.new_node_metadata::<T>(),
857 },
858 )
859 }
860
861 /// Returns this optional back into a top-level, asynchronous execution context where updates
862 /// to the value will be asynchronously propagated.
863 pub fn end_atomic(self) -> Optional<T, L, B> {
864 Optional::new(self.location.tick.l, self.ir_node.into_inner())
865 }
866}
867
868impl<'a, T, L, B: Boundedness> Optional<T, L, B>
869where
870 L: Location<'a> + NoTick + NoAtomic,
871{
872 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
873 /// will observe the same version of the value and will be executed synchronously before any
874 /// outputs are yielded (in [`Optional::end_atomic`]).
875 ///
876 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
877 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
878 /// a different version).
879 ///
880 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
881 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
882 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
883 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
884 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
885 }
886
887 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
888 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
889 /// relevant data that contributed to the snapshot at tick `t`.
890 ///
891 /// # Non-Determinism
892 /// Because this picks a snapshot of a optional whose value is continuously changing,
893 /// the output optional has a non-deterministic value since the snapshot can be at an
894 /// arbitrary point in time.
895 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
896 self.atomic(tick).snapshot(nondet)
897 }
898
899 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
900 /// with order corresponding to increasing prefixes of data contributing to the optional.
901 ///
902 /// # Non-Determinism
903 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
904 /// to non-deterministic batching and arrival of inputs, the output stream is
905 /// non-deterministic.
906 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
907 let tick = self.location.tick();
908 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
909 }
910
911 /// Given a time interval, returns a stream corresponding to snapshots of the optional
912 /// value taken at various points in time. Because the input optional may be
913 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
914 /// represent the value of the optional given some prefix of the streams leading up to
915 /// it.
916 ///
917 /// # Non-Determinism
918 /// The output stream is non-deterministic in which elements are sampled, since this
919 /// is controlled by a clock.
920 pub fn sample_every(
921 self,
922 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
923 nondet: NonDet,
924 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
925 let samples = self.location.source_interval(interval, nondet);
926 let tick = self.location.tick();
927
928 self.snapshot(&tick, nondet)
929 .filter_if_some(samples.batch(&tick, nondet).first())
930 .all_ticks()
931 .weakest_retries()
932 }
933}
934
935impl<'a, T, L> Optional<T, Tick<L>, Bounded>
936where
937 L: Location<'a>,
938{
939 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
940 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
941 /// null values).
942 ///
943 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
944 /// producing one element in the output for each (non-null) tick. This is useful for batched
945 /// computations, where the results from each tick must be combined together.
946 ///
947 /// # Example
948 /// ```rust
949 /// # use hydro_lang::prelude::*;
950 /// # use futures::StreamExt;
951 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
952 /// # let tick = process.tick();
953 /// # // ticks are lazy by default, forces the second tick to run
954 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
955 /// # let batch_first_tick = process
956 /// # .source_iter(q!(vec![]))
957 /// # .batch(&tick, nondet!(/** test */));
958 /// # let batch_second_tick = process
959 /// # .source_iter(q!(vec![1, 2, 3]))
960 /// # .batch(&tick, nondet!(/** test */))
961 /// # .defer_tick(); // appears on the second tick
962 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
963 /// input_batch // first tick: [], second tick: [1, 2, 3]
964 /// .max()
965 /// .all_ticks()
966 /// # }, |mut stream| async move {
967 /// // [3]
968 /// # for w in vec![3] {
969 /// # assert_eq!(stream.next().await.unwrap(), w);
970 /// # }
971 /// # }));
972 /// ```
973 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
974 self.into_stream().all_ticks()
975 }
976
977 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
978 /// which will stream the value computed in _each_ tick as a separate stream element.
979 ///
980 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
981 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
982 /// optional's [`Tick`] context.
983 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
984 self.into_stream().all_ticks_atomic()
985 }
986
987 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
988 /// be asynchronously updated with the latest value of the optional inside the tick, including
989 /// whether the optional is null or not.
990 ///
991 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
992 /// tick that tracks the inner value. This is useful for getting the value as of the
993 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
994 ///
995 /// # Example
996 /// ```rust
997 /// # use hydro_lang::prelude::*;
998 /// # use futures::StreamExt;
999 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1000 /// # let tick = process.tick();
1001 /// # // ticks are lazy by default, forces the second tick to run
1002 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1003 /// # let batch_first_tick = process
1004 /// # .source_iter(q!(vec![]))
1005 /// # .batch(&tick, nondet!(/** test */));
1006 /// # let batch_second_tick = process
1007 /// # .source_iter(q!(vec![1, 2, 3]))
1008 /// # .batch(&tick, nondet!(/** test */))
1009 /// # .defer_tick(); // appears on the second tick
1010 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1011 /// input_batch // first tick: [], second tick: [1, 2, 3]
1012 /// .max()
1013 /// .latest()
1014 /// # .into_singleton()
1015 /// # .sample_eager(nondet!(/** test */))
1016 /// # }, |mut stream| async move {
1017 /// // asynchronously changes from None ~> 3
1018 /// # for w in vec![None, Some(3)] {
1019 /// # assert_eq!(stream.next().await.unwrap(), w);
1020 /// # }
1021 /// # }));
1022 /// ```
1023 pub fn latest(self) -> Optional<T, L, Unbounded> {
1024 Optional::new(
1025 self.location.outer().clone(),
1026 HydroNode::Persist {
1027 inner: Box::new(self.ir_node.into_inner()),
1028 metadata: self.location.new_node_metadata::<T>(),
1029 },
1030 )
1031 }
1032
1033 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1034 /// be updated with the latest value of the optional inside the tick.
1035 ///
1036 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1037 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1038 /// optional's [`Tick`] context.
1039 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1040 Optional::new(
1041 Atomic {
1042 tick: self.location.clone(),
1043 },
1044 HydroNode::Persist {
1045 inner: Box::new(self.ir_node.into_inner()),
1046 metadata: self.location.new_node_metadata::<T>(),
1047 },
1048 )
1049 }
1050
1051 #[expect(missing_docs, reason = "TODO")]
1052 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1053 Optional::new(
1054 self.location.clone(),
1055 HydroNode::DeferTick {
1056 input: Box::new(self.ir_node.into_inner()),
1057 metadata: self.location.new_node_metadata::<T>(),
1058 },
1059 )
1060 }
1061
1062 #[deprecated(note = "use .into_stream().persist()")]
1063 #[expect(missing_docs, reason = "deprecated")]
1064 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1065 Stream::new(
1066 self.location.clone(),
1067 HydroNode::Persist {
1068 inner: Box::new(self.ir_node.into_inner()),
1069 metadata: self.location.new_node_metadata::<T>(),
1070 },
1071 )
1072 }
1073
1074 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1075 /// non-null. Otherwise, the stream is empty.
1076 ///
1077 /// # Example
1078 /// ```rust
1079 /// # use hydro_lang::prelude::*;
1080 /// # use futures::StreamExt;
1081 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1082 /// # let tick = process.tick();
1083 /// # // ticks are lazy by default, forces the second tick to run
1084 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1085 /// # let batch_first_tick = process
1086 /// # .source_iter(q!(vec![]))
1087 /// # .batch(&tick, nondet!(/** test */));
1088 /// # let batch_second_tick = process
1089 /// # .source_iter(q!(vec![123, 456]))
1090 /// # .batch(&tick, nondet!(/** test */))
1091 /// # .defer_tick(); // appears on the second tick
1092 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1093 /// input_batch // first tick: [], second tick: [123, 456]
1094 /// .clone()
1095 /// .max()
1096 /// .into_stream()
1097 /// .chain(input_batch)
1098 /// .all_ticks()
1099 /// # }, |mut stream| async move {
1100 /// // [456, 123, 456]
1101 /// # for w in vec![456, 123, 456] {
1102 /// # assert_eq!(stream.next().await.unwrap(), w);
1103 /// # }
1104 /// # }));
1105 /// ```
1106 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1107 Stream::new(self.location, self.ir_node.into_inner())
1108 }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use futures::StreamExt;
1114 use hydro_deploy::Deployment;
1115 use stageleft::q;
1116
1117 use super::Optional;
1118 use crate::compile::builder::FlowBuilder;
1119 use crate::location::Location;
1120
1121 #[tokio::test]
1122 async fn optional_or_cardinality() {
1123 let mut deployment = Deployment::new();
1124
1125 let flow = FlowBuilder::new();
1126 let node = flow.process::<()>();
1127 let external = flow.external::<()>();
1128
1129 let node_tick = node.tick();
1130 let tick_singleton = node_tick.singleton(q!(123));
1131 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1132 let counts = tick_optional_inhabited
1133 .clone()
1134 .or(tick_optional_inhabited)
1135 .into_stream()
1136 .count()
1137 .all_ticks()
1138 .send_bincode_external(&external);
1139
1140 let nodes = flow
1141 .with_process(&node, deployment.Localhost())
1142 .with_external(&external, deployment.Localhost())
1143 .deploy(&mut deployment);
1144
1145 deployment.deploy().await.unwrap();
1146
1147 let mut external_out = nodes.connect_source_bincode(counts).await;
1148
1149 deployment.start().await.unwrap();
1150
1151 assert_eq!(external_out.next().await.unwrap(), 1);
1152 }
1153}