hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25
26/// A *nullable* Rust value that can asynchronously change over time.
27///
28/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
29/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
30/// asynchronously change over time, including becoming present of uninhabited.
31///
32/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
33/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this optional (when it is not null)
37/// - `Loc`: the [`Location`] where the optional is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Optional<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42 pub(crate) flow_state: FlowState,
43
44 _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
48 fn drop(&mut self) {
49 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52 input: Box::new(ir_node),
53 op_metadata: HydroIrOpMetadata::new(),
54 });
55 }
56 }
57}
58
59impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
60where
61 T: Clone,
62 L: Location<'a> + NoTick,
63{
64 fn from(value: Optional<T, L, Bounded>) -> Self {
65 let tick = value.location().tick();
66 value.clone_into_tick(&tick).latest()
67 }
68}
69
70impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn defer_tick(self) -> Self {
75 Optional::defer_tick(self)
76 }
77}
78
79impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
80where
81 L: Location<'a>,
82{
83 type Location = Tick<L>;
84
85 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
86 Optional::new(
87 location.clone(),
88 HydroNode::CycleSource {
89 cycle_id,
90 metadata: location.new_node_metadata(Self::collection_kind()),
91 },
92 )
93 }
94}
95
96impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
97where
98 L: Location<'a>,
99{
100 type Location = Tick<L>;
101
102 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
103 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
104 location.clone(),
105 HydroNode::DeferTick {
106 input: Box::new(HydroNode::CycleSource {
107 cycle_id,
108 metadata: location.new_node_metadata(Self::collection_kind()),
109 }),
110 metadata: location
111 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
112 },
113 );
114
115 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
116 }
117}
118
119impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
120where
121 L: Location<'a>,
122{
123 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
124 assert_eq!(
125 Location::id(&self.location),
126 expected_location,
127 "locations do not match"
128 );
129 self.location
130 .flow_state()
131 .borrow_mut()
132 .push_root(HydroRoot::CycleSink {
133 cycle_id,
134 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
135 op_metadata: HydroIrOpMetadata::new(),
136 });
137 }
138}
139
140impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
141where
142 L: Location<'a>,
143{
144 type Location = Tick<L>;
145
146 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
147 Optional::new(
148 location.clone(),
149 HydroNode::CycleSource {
150 cycle_id,
151 metadata: location.new_node_metadata(Self::collection_kind()),
152 },
153 )
154 }
155}
156
157impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
158where
159 L: Location<'a>,
160{
161 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
162 assert_eq!(
163 Location::id(&self.location),
164 expected_location,
165 "locations do not match"
166 );
167 self.location
168 .flow_state()
169 .borrow_mut()
170 .push_root(HydroRoot::CycleSink {
171 cycle_id,
172 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
173 op_metadata: HydroIrOpMetadata::new(),
174 });
175 }
176}
177
178impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
179where
180 L: Location<'a> + NoTick,
181{
182 type Location = L;
183
184 fn create_source(cycle_id: CycleId, location: L) -> Self {
185 Optional::new(
186 location.clone(),
187 HydroNode::CycleSource {
188 cycle_id,
189 metadata: location.new_node_metadata(Self::collection_kind()),
190 },
191 )
192 }
193}
194
195impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
196where
197 L: Location<'a> + NoTick,
198{
199 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
200 assert_eq!(
201 Location::id(&self.location),
202 expected_location,
203 "locations do not match"
204 );
205 self.location
206 .flow_state()
207 .borrow_mut()
208 .push_root(HydroRoot::CycleSink {
209 cycle_id,
210 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
211 op_metadata: HydroIrOpMetadata::new(),
212 });
213 }
214}
215
216impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
217where
218 L: Location<'a>,
219{
220 fn from(singleton: Singleton<T, L, B>) -> Self {
221 Optional::new(
222 singleton.location.clone(),
223 HydroNode::Cast {
224 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
225 metadata: singleton
226 .location
227 .new_node_metadata(Self::collection_kind()),
228 },
229 )
230 }
231}
232
233#[cfg(stageleft_runtime)]
234pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
235 me: Optional<T, L, B>,
236 other: Optional<O, L, B>,
237) -> Optional<(T, O), L, B> {
238 check_matching_location(&me.location, &other.location);
239
240 Optional::new(
241 me.location.clone(),
242 HydroNode::CrossSingleton {
243 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
244 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
245 metadata: me
246 .location
247 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
248 },
249 )
250}
251
252#[cfg(stageleft_runtime)]
253fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
254 me: Optional<T, L, B>,
255 other: Optional<T, L, B>,
256) -> Optional<T, L, B> {
257 check_matching_location(&me.location, &other.location);
258
259 Optional::new(
260 me.location.clone(),
261 HydroNode::ChainFirst {
262 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
263 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
264 metadata: me
265 .location
266 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
267 },
268 )
269}
270
271impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
272where
273 T: Clone,
274 L: Location<'a>,
275{
276 fn clone(&self) -> Self {
277 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
278 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
279 *self.ir_node.borrow_mut() = HydroNode::Tee {
280 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
281 metadata: self.location.new_node_metadata(Self::collection_kind()),
282 };
283 }
284
285 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
286 Optional {
287 location: self.location.clone(),
288 flow_state: self.flow_state.clone(),
289 ir_node: HydroNode::Tee {
290 inner: SharedNode(inner.0.clone()),
291 metadata: metadata.clone(),
292 }
293 .into(),
294 _phantom: PhantomData,
295 }
296 } else {
297 unreachable!()
298 }
299 }
300}
301
302impl<'a, T, L, B: Boundedness> Optional<T, L, B>
303where
304 L: Location<'a>,
305{
306 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
307 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
308 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
309 let flow_state = location.flow_state().clone();
310 Optional {
311 location,
312 flow_state,
313 ir_node: RefCell::new(ir_node),
314 _phantom: PhantomData,
315 }
316 }
317
318 pub(crate) fn collection_kind() -> CollectionKind {
319 CollectionKind::Optional {
320 bound: B::BOUND_KIND,
321 element_type: stageleft::quote_type::<T>().into(),
322 }
323 }
324
325 /// Returns the [`Location`] where this optional is being materialized.
326 pub fn location(&self) -> &L {
327 &self.location
328 }
329
330 /// Transforms the optional value by applying a function `f` to it,
331 /// continuously as the input is updated.
332 ///
333 /// Whenever the optional is empty, the output optional is also empty.
334 ///
335 /// # Example
336 /// ```rust
337 /// # #[cfg(feature = "deploy")] {
338 /// # use hydro_lang::prelude::*;
339 /// # use futures::StreamExt;
340 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341 /// let tick = process.tick();
342 /// let optional = tick.optional_first_tick(q!(1));
343 /// optional.map(q!(|v| v + 1)).all_ticks()
344 /// # }, |mut stream| async move {
345 /// // 2
346 /// # assert_eq!(stream.next().await.unwrap(), 2);
347 /// # }));
348 /// # }
349 /// ```
350 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
351 where
352 F: Fn(T) -> U + 'a,
353 {
354 let f = f.splice_fn1_ctx(&self.location).into();
355 Optional::new(
356 self.location.clone(),
357 HydroNode::Map {
358 f,
359 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
360 metadata: self
361 .location
362 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
363 },
364 )
365 }
366
367 /// Transforms the optional value by applying a function `f` to it and then flattening
368 /// the result into a stream, preserving the order of elements.
369 ///
370 /// If the optional is empty, the output stream is also empty. If the optional contains
371 /// a value, `f` is applied to produce an iterator, and all items from that iterator
372 /// are emitted in the output stream in deterministic order.
373 ///
374 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
375 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
376 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
377 ///
378 /// # Example
379 /// ```rust
380 /// # #[cfg(feature = "deploy")] {
381 /// # use hydro_lang::prelude::*;
382 /// # use futures::StreamExt;
383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
384 /// let tick = process.tick();
385 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
386 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
387 /// # }, |mut stream| async move {
388 /// // 1, 2, 3
389 /// # for w in vec![1, 2, 3] {
390 /// # assert_eq!(stream.next().await.unwrap(), w);
391 /// # }
392 /// # }));
393 /// # }
394 /// ```
395 pub fn flat_map_ordered<U, I, F>(
396 self,
397 f: impl IntoQuotedMut<'a, F, L>,
398 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
399 where
400 B: IsBounded,
401 I: IntoIterator<Item = U>,
402 F: Fn(T) -> I + 'a,
403 {
404 self.into_stream().flat_map_ordered(f)
405 }
406
407 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
408 /// for the output type `I` to produce items in any order.
409 ///
410 /// If the optional is empty, the output stream is also empty. If the optional contains
411 /// a value, `f` is applied to produce an iterator, and all items from that iterator
412 /// are emitted in the output stream in non-deterministic order.
413 ///
414 /// # Example
415 /// ```rust
416 /// # #[cfg(feature = "deploy")] {
417 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
418 /// # use futures::StreamExt;
419 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
420 /// let tick = process.tick();
421 /// let optional = tick.optional_first_tick(q!(
422 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
423 /// ));
424 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
425 /// # }, |mut stream| async move {
426 /// // 1, 2, 3, but in no particular order
427 /// # let mut results = Vec::new();
428 /// # for _ in 0..3 {
429 /// # results.push(stream.next().await.unwrap());
430 /// # }
431 /// # results.sort();
432 /// # assert_eq!(results, vec![1, 2, 3]);
433 /// # }));
434 /// # }
435 /// ```
436 pub fn flat_map_unordered<U, I, F>(
437 self,
438 f: impl IntoQuotedMut<'a, F, L>,
439 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
440 where
441 B: IsBounded,
442 I: IntoIterator<Item = U>,
443 F: Fn(T) -> I + 'a,
444 {
445 self.into_stream().flat_map_unordered(f)
446 }
447
448 /// Flattens the optional value into a stream, preserving the order of elements.
449 ///
450 /// If the optional is empty, the output stream is also empty. If the optional contains
451 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
452 /// in the output stream in deterministic order.
453 ///
454 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
455 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
456 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
457 ///
458 /// # Example
459 /// ```rust
460 /// # #[cfg(feature = "deploy")] {
461 /// # use hydro_lang::prelude::*;
462 /// # use futures::StreamExt;
463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464 /// let tick = process.tick();
465 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
466 /// optional.flatten_ordered().all_ticks()
467 /// # }, |mut stream| async move {
468 /// // 1, 2, 3
469 /// # for w in vec![1, 2, 3] {
470 /// # assert_eq!(stream.next().await.unwrap(), w);
471 /// # }
472 /// # }));
473 /// # }
474 /// ```
475 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
476 where
477 B: IsBounded,
478 T: IntoIterator<Item = U>,
479 {
480 self.flat_map_ordered(q!(|v| v))
481 }
482
483 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
484 /// for the element type `T` to produce items in any order.
485 ///
486 /// If the optional is empty, the output stream is also empty. If the optional contains
487 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
488 /// in the output stream in non-deterministic order.
489 ///
490 /// # Example
491 /// ```rust
492 /// # #[cfg(feature = "deploy")] {
493 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
494 /// # use futures::StreamExt;
495 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
496 /// let tick = process.tick();
497 /// let optional = tick.optional_first_tick(q!(
498 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
499 /// ));
500 /// optional.flatten_unordered().all_ticks()
501 /// # }, |mut stream| async move {
502 /// // 1, 2, 3, but in no particular order
503 /// # let mut results = Vec::new();
504 /// # for _ in 0..3 {
505 /// # results.push(stream.next().await.unwrap());
506 /// # }
507 /// # results.sort();
508 /// # assert_eq!(results, vec![1, 2, 3]);
509 /// # }));
510 /// # }
511 /// ```
512 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
513 where
514 B: IsBounded,
515 T: IntoIterator<Item = U>,
516 {
517 self.flat_map_unordered(q!(|v| v))
518 }
519
520 /// Creates an optional containing only the value if it satisfies a predicate `f`.
521 ///
522 /// If the optional is empty, the output optional is also empty. If the optional contains
523 /// a value and the predicate returns `true`, the output optional contains the same value.
524 /// If the predicate returns `false`, the output optional is empty.
525 ///
526 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
527 /// not modify or take ownership of the value. If you need to modify the value while filtering
528 /// use [`Optional::filter_map`] instead.
529 ///
530 /// # Example
531 /// ```rust
532 /// # #[cfg(feature = "deploy")] {
533 /// # use hydro_lang::prelude::*;
534 /// # use futures::StreamExt;
535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536 /// let tick = process.tick();
537 /// let optional = tick.optional_first_tick(q!(5));
538 /// optional.filter(q!(|&x| x > 3)).all_ticks()
539 /// # }, |mut stream| async move {
540 /// // 5
541 /// # assert_eq!(stream.next().await.unwrap(), 5);
542 /// # }));
543 /// # }
544 /// ```
545 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
546 where
547 F: Fn(&T) -> bool + 'a,
548 {
549 let f = f.splice_fn1_borrow_ctx(&self.location).into();
550 Optional::new(
551 self.location.clone(),
552 HydroNode::Filter {
553 f,
554 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
555 metadata: self.location.new_node_metadata(Self::collection_kind()),
556 },
557 )
558 }
559
560 /// An operator that both filters and maps. It yields only the value if the supplied
561 /// closure `f` returns `Some(value)`.
562 ///
563 /// If the optional is empty, the output optional is also empty. If the optional contains
564 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
565 /// If the closure returns `None`, the output optional is empty.
566 ///
567 /// # Example
568 /// ```rust
569 /// # #[cfg(feature = "deploy")] {
570 /// # use hydro_lang::prelude::*;
571 /// # use futures::StreamExt;
572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
573 /// let tick = process.tick();
574 /// let optional = tick.optional_first_tick(q!("42"));
575 /// optional
576 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
577 /// .all_ticks()
578 /// # }, |mut stream| async move {
579 /// // 42
580 /// # assert_eq!(stream.next().await.unwrap(), 42);
581 /// # }));
582 /// # }
583 /// ```
584 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
585 where
586 F: Fn(T) -> Option<U> + 'a,
587 {
588 let f = f.splice_fn1_ctx(&self.location).into();
589 Optional::new(
590 self.location.clone(),
591 HydroNode::FilterMap {
592 f,
593 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594 metadata: self
595 .location
596 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
597 },
598 )
599 }
600
601 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
602 ///
603 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
604 /// non-null. This is useful for combining several pieces of state together.
605 ///
606 /// # Example
607 /// ```rust
608 /// # #[cfg(feature = "deploy")] {
609 /// # use hydro_lang::prelude::*;
610 /// # use futures::StreamExt;
611 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612 /// let tick = process.tick();
613 /// let numbers = process
614 /// .source_iter(q!(vec![123, 456, 789]))
615 /// .batch(&tick, nondet!(/** test */));
616 /// let min = numbers.clone().min(); // Optional
617 /// let max = numbers.max(); // Optional
618 /// min.zip(max).all_ticks()
619 /// # }, |mut stream| async move {
620 /// // [(123, 789)]
621 /// # for w in vec![(123, 789)] {
622 /// # assert_eq!(stream.next().await.unwrap(), w);
623 /// # }
624 /// # }));
625 /// # }
626 /// ```
627 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
628 where
629 B: IsBounded,
630 {
631 let other: Optional<O, L, B> = other.into();
632 check_matching_location(&self.location, &other.location);
633
634 if L::is_top_level()
635 && let Some(tick) = self.location.try_tick()
636 {
637 let out = zip_inside_tick(
638 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640 )
641 .latest();
642
643 Optional::new(
644 out.location.clone(),
645 out.ir_node.replace(HydroNode::Placeholder),
646 )
647 } else {
648 zip_inside_tick(self, other)
649 }
650 }
651
652 /// Passes through `self` when it has a value, otherwise passes through `other`.
653 ///
654 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
655 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
656 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
657 ///
658 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
659 /// of the inputs change (including to/from null states).
660 ///
661 /// # Example
662 /// ```rust
663 /// # #[cfg(feature = "deploy")] {
664 /// # use hydro_lang::prelude::*;
665 /// # use futures::StreamExt;
666 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
667 /// let tick = process.tick();
668 /// // ticks are lazy by default, forces the second tick to run
669 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
670 ///
671 /// let some_first_tick = tick.optional_first_tick(q!(123));
672 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
673 /// some_first_tick.or(some_second_tick).all_ticks()
674 /// # }, |mut stream| async move {
675 /// // [123 /* first tick */, 456 /* second tick */]
676 /// # for w in vec![123, 456] {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
683 check_matching_location(&self.location, &other.location);
684
685 if L::is_top_level()
686 && !B::BOUNDED // only if unbounded we need to use a tick
687 && let Some(tick) = self.location.try_tick()
688 {
689 let out = or_inside_tick(
690 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
691 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692 )
693 .latest();
694
695 Optional::new(
696 out.location.clone(),
697 out.ir_node.replace(HydroNode::Placeholder),
698 )
699 } else {
700 Optional::new(
701 self.location.clone(),
702 HydroNode::ChainFirst {
703 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
704 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
705 metadata: self.location.new_node_metadata(Self::collection_kind()),
706 },
707 )
708 }
709 }
710
711 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
712 ///
713 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
714 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
715 ///
716 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
717 /// of the inputs change (including to/from null states).
718 ///
719 /// # Example
720 /// ```rust
721 /// # #[cfg(feature = "deploy")] {
722 /// # use hydro_lang::prelude::*;
723 /// # use futures::StreamExt;
724 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
725 /// let tick = process.tick();
726 /// // ticks are lazy by default, forces the later ticks to run
727 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
728 ///
729 /// let some_first_tick = tick.optional_first_tick(q!(123));
730 /// some_first_tick
731 /// .unwrap_or(tick.singleton(q!(456)))
732 /// .all_ticks()
733 /// # }, |mut stream| async move {
734 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
735 /// # for w in vec![123, 456, 456, 456] {
736 /// # assert_eq!(stream.next().await.unwrap(), w);
737 /// # }
738 /// # }));
739 /// # }
740 /// ```
741 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
742 let res_option = self.or(other.into());
743 Singleton::new(
744 res_option.location.clone(),
745 HydroNode::Cast {
746 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
747 metadata: res_option
748 .location
749 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
750 },
751 )
752 }
753
754 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
755 ///
756 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
757 /// [`Optional`] when the default value of the type is a suitable fallback.
758 ///
759 /// # Example
760 /// ```rust
761 /// # #[cfg(feature = "deploy")] {
762 /// # use hydro_lang::prelude::*;
763 /// # use futures::StreamExt;
764 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765 /// let tick = process.tick();
766 /// // ticks are lazy by default, forces the later ticks to run
767 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
768 ///
769 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
770 /// some_first_tick.unwrap_or_default().all_ticks()
771 /// # }, |mut stream| async move {
772 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
773 /// # for w in vec![123, 0, 0, 0] {
774 /// # assert_eq!(stream.next().await.unwrap(), w);
775 /// # }
776 /// # }));
777 /// # }
778 /// ```
779 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
780 where
781 T: Default + Clone,
782 {
783 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
784 }
785
786 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
787 ///
788 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
789 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
790 /// so that Hydro can skip any computation on null values.
791 ///
792 /// # Example
793 /// ```rust
794 /// # #[cfg(feature = "deploy")] {
795 /// # use hydro_lang::prelude::*;
796 /// # use futures::StreamExt;
797 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
798 /// let tick = process.tick();
799 /// // ticks are lazy by default, forces the later ticks to run
800 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
801 ///
802 /// let some_first_tick = tick.optional_first_tick(q!(123));
803 /// some_first_tick.into_singleton().all_ticks()
804 /// # }, |mut stream| async move {
805 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
806 /// # for w in vec![Some(123), None, None, None] {
807 /// # assert_eq!(stream.next().await.unwrap(), w);
808 /// # }
809 /// # }));
810 /// # }
811 /// ```
812 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
813 where
814 T: Clone,
815 {
816 let none: syn::Expr = parse_quote!(::std::option::Option::None);
817
818 let none_singleton = Singleton::new(
819 self.location.clone(),
820 HydroNode::SingletonSource {
821 value: none.into(),
822 first_tick_only: false,
823 metadata: self
824 .location
825 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
826 },
827 );
828
829 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
830 }
831
832 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
833 ///
834 /// # Example
835 /// ```rust
836 /// # #[cfg(feature = "deploy")] {
837 /// # use hydro_lang::prelude::*;
838 /// # use futures::StreamExt;
839 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
840 /// let tick = process.tick();
841 /// // ticks are lazy by default, forces the second tick to run
842 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
843 ///
844 /// let some_first_tick = tick.optional_first_tick(q!(42));
845 /// some_first_tick.is_some().all_ticks()
846 /// # }, |mut stream| async move {
847 /// // [true /* first tick */, false /* second tick */, ...]
848 /// # for w in vec![true, false] {
849 /// # assert_eq!(stream.next().await.unwrap(), w);
850 /// # }
851 /// # }));
852 /// # }
853 /// ```
854 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
855 pub fn is_some(self) -> Singleton<bool, L, B> {
856 self.map(q!(|_| ()))
857 .into_singleton()
858 .map(q!(|o| o.is_some()))
859 }
860
861 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
862 ///
863 /// # Example
864 /// ```rust
865 /// # #[cfg(feature = "deploy")] {
866 /// # use hydro_lang::prelude::*;
867 /// # use futures::StreamExt;
868 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
869 /// let tick = process.tick();
870 /// // ticks are lazy by default, forces the second tick to run
871 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
872 ///
873 /// let some_first_tick = tick.optional_first_tick(q!(42));
874 /// some_first_tick.is_none().all_ticks()
875 /// # }, |mut stream| async move {
876 /// // [false /* first tick */, true /* second tick */, ...]
877 /// # for w in vec![false, true] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
884 pub fn is_none(self) -> Singleton<bool, L, B> {
885 self.map(q!(|_| ()))
886 .into_singleton()
887 .map(q!(|o| o.is_none()))
888 }
889
890 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
891 /// values are equal, `false` otherwise (including when either is null).
892 ///
893 /// # Example
894 /// ```rust
895 /// # #[cfg(feature = "deploy")] {
896 /// # use hydro_lang::prelude::*;
897 /// # use futures::StreamExt;
898 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
899 /// let tick = process.tick();
900 /// // ticks are lazy by default, forces the second tick to run
901 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
902 ///
903 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
904 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
905 /// a.is_some_and_equals(b).all_ticks()
906 /// # }, |mut stream| async move {
907 /// // [true, false]
908 /// # for w in vec![true, false] {
909 /// # assert_eq!(stream.next().await.unwrap(), w);
910 /// # }
911 /// # }));
912 /// # }
913 /// ```
914 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
915 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
916 where
917 T: PartialEq + Clone,
918 B: IsBounded,
919 {
920 self.into_singleton()
921 .zip(other.into_singleton())
922 .map(q!(|(a, b)| a.is_some() && a == b))
923 }
924
925 /// An operator which allows you to "name" a `HydroNode`.
926 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
927 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
928 {
929 let mut node = self.ir_node.borrow_mut();
930 let metadata = node.metadata_mut();
931 metadata.tag = Some(name.to_owned());
932 }
933 self
934 }
935
936 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
937 /// implies that `B == Bounded`.
938 pub fn make_bounded(self) -> Optional<T, L, Bounded>
939 where
940 B: IsBounded,
941 {
942 Optional::new(
943 self.location.clone(),
944 self.ir_node.replace(HydroNode::Placeholder),
945 )
946 }
947
948 /// Clones this bounded optional into a tick, returning a optional that has the
949 /// same value as the outer optional. Because the outer optional is bounded, this
950 /// is deterministic because there is only a single immutable version.
951 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
952 where
953 B: IsBounded,
954 T: Clone,
955 {
956 // TODO(shadaj): avoid printing simulator logs for this snapshot
957 self.snapshot(
958 tick,
959 nondet!(/** bounded top-level optional so deterministic */),
960 )
961 }
962
963 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
964 /// non-null. Otherwise, the stream is empty.
965 ///
966 /// # Example
967 /// ```rust
968 /// # #[cfg(feature = "deploy")] {
969 /// # use hydro_lang::prelude::*;
970 /// # use futures::StreamExt;
971 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
972 /// # let tick = process.tick();
973 /// # // ticks are lazy by default, forces the second tick to run
974 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
975 /// # let batch_first_tick = process
976 /// # .source_iter(q!(vec![]))
977 /// # .batch(&tick, nondet!(/** test */));
978 /// # let batch_second_tick = process
979 /// # .source_iter(q!(vec![123, 456]))
980 /// # .batch(&tick, nondet!(/** test */))
981 /// # .defer_tick(); // appears on the second tick
982 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
983 /// input_batch // first tick: [], second tick: [123, 456]
984 /// .clone()
985 /// .max()
986 /// .into_stream()
987 /// .chain(input_batch)
988 /// .all_ticks()
989 /// # }, |mut stream| async move {
990 /// // [456, 123, 456]
991 /// # for w in vec![456, 123, 456] {
992 /// # assert_eq!(stream.next().await.unwrap(), w);
993 /// # }
994 /// # }));
995 /// # }
996 /// ```
997 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
998 where
999 B: IsBounded,
1000 {
1001 Stream::new(
1002 self.location.clone(),
1003 HydroNode::Cast {
1004 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1005 metadata: self.location.new_node_metadata(Stream::<
1006 T,
1007 Tick<L>,
1008 Bounded,
1009 TotalOrder,
1010 ExactlyOnce,
1011 >::collection_kind()),
1012 },
1013 )
1014 }
1015
1016 /// Filters this optional, passing through the value if the boolean signal is `true`,
1017 /// otherwise the output is null.
1018 ///
1019 /// # Example
1020 /// ```rust
1021 /// # #[cfg(feature = "deploy")] {
1022 /// # use hydro_lang::prelude::*;
1023 /// # use futures::StreamExt;
1024 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1025 /// let tick = process.tick();
1026 /// // ticks are lazy by default, forces the second tick to run
1027 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1028 ///
1029 /// let some_first_tick = tick.optional_first_tick(q!(()));
1030 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1031 /// let batch_first_tick = process
1032 /// .source_iter(q!(vec![456]))
1033 /// .batch(&tick, nondet!(/** test */));
1034 /// let batch_second_tick = process
1035 /// .source_iter(q!(vec![789]))
1036 /// .batch(&tick, nondet!(/** test */))
1037 /// .defer_tick();
1038 /// batch_first_tick.chain(batch_second_tick).first()
1039 /// .filter_if(signal)
1040 /// .unwrap_or(tick.singleton(q!(0)))
1041 /// .all_ticks()
1042 /// # }, |mut stream| async move {
1043 /// // [456, 0]
1044 /// # for w in vec![456, 0] {
1045 /// # assert_eq!(stream.next().await.unwrap(), w);
1046 /// # }
1047 /// # }));
1048 /// # }
1049 /// ```
1050 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1051 where
1052 B: IsBounded,
1053 {
1054 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1055 }
1056
1057 /// Filters this optional, passing through the optional value if it is non-null **and** the
1058 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1059 ///
1060 /// Useful for conditionally processing, such as only emitting an optional's value outside
1061 /// a tick if some other condition is satisfied.
1062 ///
1063 /// # Example
1064 /// ```rust
1065 /// # #[cfg(feature = "deploy")] {
1066 /// # use hydro_lang::prelude::*;
1067 /// # use futures::StreamExt;
1068 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1069 /// let tick = process.tick();
1070 /// // ticks are lazy by default, forces the second tick to run
1071 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1072 ///
1073 /// let batch_first_tick = process
1074 /// .source_iter(q!(vec![]))
1075 /// .batch(&tick, nondet!(/** test */));
1076 /// let batch_second_tick = process
1077 /// .source_iter(q!(vec![456]))
1078 /// .batch(&tick, nondet!(/** test */))
1079 /// .defer_tick(); // appears on the second tick
1080 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1081 /// batch_first_tick.chain(batch_second_tick).first()
1082 /// .filter_if_some(some_on_first_tick)
1083 /// .unwrap_or(tick.singleton(q!(789)))
1084 /// .all_ticks()
1085 /// # }, |mut stream| async move {
1086 /// // [789, 789]
1087 /// # for w in vec![789, 789] {
1088 /// # assert_eq!(stream.next().await.unwrap(), w);
1089 /// # }
1090 /// # }));
1091 /// # }
1092 /// ```
1093 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1094 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1095 where
1096 B: IsBounded,
1097 {
1098 self.filter_if(signal.is_some())
1099 }
1100
1101 /// Filters this optional, passing through the optional value if it is non-null **and** the
1102 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1103 ///
1104 /// Useful for conditionally processing, such as only emitting an optional's value outside
1105 /// a tick if some other condition is satisfied.
1106 ///
1107 /// # Example
1108 /// ```rust
1109 /// # #[cfg(feature = "deploy")] {
1110 /// # use hydro_lang::prelude::*;
1111 /// # use futures::StreamExt;
1112 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1113 /// let tick = process.tick();
1114 /// // ticks are lazy by default, forces the second tick to run
1115 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1116 ///
1117 /// let batch_first_tick = process
1118 /// .source_iter(q!(vec![]))
1119 /// .batch(&tick, nondet!(/** test */));
1120 /// let batch_second_tick = process
1121 /// .source_iter(q!(vec![456]))
1122 /// .batch(&tick, nondet!(/** test */))
1123 /// .defer_tick(); // appears on the second tick
1124 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1125 /// batch_first_tick.chain(batch_second_tick).first()
1126 /// .filter_if_none(some_on_first_tick)
1127 /// .unwrap_or(tick.singleton(q!(789)))
1128 /// .all_ticks()
1129 /// # }, |mut stream| async move {
1130 /// // [789, 789]
1131 /// # for w in vec![789, 456] {
1132 /// # assert_eq!(stream.next().await.unwrap(), w);
1133 /// # }
1134 /// # }));
1135 /// # }
1136 /// ```
1137 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1138 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1139 where
1140 B: IsBounded,
1141 {
1142 self.filter_if(other.is_none())
1143 }
1144
1145 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1146 ///
1147 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1148 /// having a value, such as only releasing a piece of state if the node is the leader.
1149 ///
1150 /// # Example
1151 /// ```rust
1152 /// # #[cfg(feature = "deploy")] {
1153 /// # use hydro_lang::prelude::*;
1154 /// # use futures::StreamExt;
1155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156 /// let tick = process.tick();
1157 /// // ticks are lazy by default, forces the second tick to run
1158 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1159 ///
1160 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1161 /// some_on_first_tick
1162 /// .if_some_then(tick.singleton(q!(456)))
1163 /// .unwrap_or(tick.singleton(q!(123)))
1164 /// # .all_ticks()
1165 /// # }, |mut stream| async move {
1166 /// // 456 (first tick) ~> 123 (second tick onwards)
1167 /// # for w in vec![456, 123, 123] {
1168 /// # assert_eq!(stream.next().await.unwrap(), w);
1169 /// # }
1170 /// # }));
1171 /// # }
1172 /// ```
1173 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1174 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1175 where
1176 B: IsBounded,
1177 {
1178 value.filter_if(self.is_some())
1179 }
1180}
1181
1182impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1183where
1184 L: Location<'a> + NoTick,
1185{
1186 /// Returns an optional value corresponding to the latest snapshot of the optional
1187 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1188 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1189 /// all snapshots of this optional into the atomic-associated tick will observe the
1190 /// same value each tick.
1191 ///
1192 /// # Non-Determinism
1193 /// Because this picks a snapshot of a optional whose value is continuously changing,
1194 /// the output optional has a non-deterministic value since the snapshot can be at an
1195 /// arbitrary point in time.
1196 pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1197 Optional::new(
1198 tick.clone(),
1199 HydroNode::Batch {
1200 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1201 metadata: tick
1202 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1203 },
1204 )
1205 }
1206
1207 /// Returns this optional back into a top-level, asynchronous execution context where updates
1208 /// to the value will be asynchronously propagated.
1209 pub fn end_atomic(self) -> Optional<T, L, B> {
1210 Optional::new(
1211 self.location.tick.l.clone(),
1212 HydroNode::EndAtomic {
1213 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214 metadata: self
1215 .location
1216 .tick
1217 .l
1218 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1219 },
1220 )
1221 }
1222}
1223
1224impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1225where
1226 L: Location<'a>,
1227{
1228 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1229 /// will observe the same version of the value and will be executed synchronously before any
1230 /// outputs are yielded (in [`Optional::end_atomic`]).
1231 ///
1232 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1233 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1234 /// a different version).
1235 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1236 let id = self.location.flow_state().borrow_mut().next_clock_id();
1237 let out_location = Atomic {
1238 tick: Tick {
1239 id,
1240 l: self.location.clone(),
1241 },
1242 };
1243 Optional::new(
1244 out_location.clone(),
1245 HydroNode::BeginAtomic {
1246 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247 metadata: out_location
1248 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1249 },
1250 )
1251 }
1252
1253 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1254 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1255 /// relevant data that contributed to the snapshot at tick `t`.
1256 ///
1257 /// # Non-Determinism
1258 /// Because this picks a snapshot of a optional whose value is continuously changing,
1259 /// the output optional has a non-deterministic value since the snapshot can be at an
1260 /// arbitrary point in time.
1261 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1262 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1263 Optional::new(
1264 tick.clone(),
1265 HydroNode::Batch {
1266 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1267 metadata: tick
1268 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1269 },
1270 )
1271 }
1272
1273 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1274 /// with order corresponding to increasing prefixes of data contributing to the optional.
1275 ///
1276 /// # Non-Determinism
1277 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1278 /// to non-deterministic batching and arrival of inputs, the output stream is
1279 /// non-deterministic.
1280 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1281 where
1282 L: NoTick,
1283 {
1284 let tick = self.location.tick();
1285 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1286 }
1287
1288 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1289 /// value taken at various points in time. Because the input optional may be
1290 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1291 /// represent the value of the optional given some prefix of the streams leading up to
1292 /// it.
1293 ///
1294 /// # Non-Determinism
1295 /// The output stream is non-deterministic in which elements are sampled, since this
1296 /// is controlled by a clock.
1297 pub fn sample_every(
1298 self,
1299 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1300 nondet: NonDet,
1301 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1302 where
1303 L: NoTick + NoAtomic,
1304 {
1305 let samples = self.location.source_interval(interval, nondet);
1306 let tick = self.location.tick();
1307
1308 self.snapshot(&tick, nondet)
1309 .filter_if(samples.batch(&tick, nondet).first().is_some())
1310 .all_ticks()
1311 .weaken_retries()
1312 }
1313}
1314
1315impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1316where
1317 L: Location<'a>,
1318{
1319 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1320 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1321 /// null values).
1322 ///
1323 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1324 /// producing one element in the output for each (non-null) tick. This is useful for batched
1325 /// computations, where the results from each tick must be combined together.
1326 ///
1327 /// # Example
1328 /// ```rust
1329 /// # #[cfg(feature = "deploy")] {
1330 /// # use hydro_lang::prelude::*;
1331 /// # use futures::StreamExt;
1332 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1333 /// # let tick = process.tick();
1334 /// # // ticks are lazy by default, forces the second tick to run
1335 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1336 /// # let batch_first_tick = process
1337 /// # .source_iter(q!(vec![]))
1338 /// # .batch(&tick, nondet!(/** test */));
1339 /// # let batch_second_tick = process
1340 /// # .source_iter(q!(vec![1, 2, 3]))
1341 /// # .batch(&tick, nondet!(/** test */))
1342 /// # .defer_tick(); // appears on the second tick
1343 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1344 /// input_batch // first tick: [], second tick: [1, 2, 3]
1345 /// .max()
1346 /// .all_ticks()
1347 /// # }, |mut stream| async move {
1348 /// // [3]
1349 /// # for w in vec![3] {
1350 /// # assert_eq!(stream.next().await.unwrap(), w);
1351 /// # }
1352 /// # }));
1353 /// # }
1354 /// ```
1355 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1356 self.into_stream().all_ticks()
1357 }
1358
1359 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1360 /// which will stream the value computed in _each_ tick as a separate stream element.
1361 ///
1362 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1363 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1364 /// optional's [`Tick`] context.
1365 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1366 self.into_stream().all_ticks_atomic()
1367 }
1368
1369 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1370 /// be asynchronously updated with the latest value of the optional inside the tick, including
1371 /// whether the optional is null or not.
1372 ///
1373 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1374 /// tick that tracks the inner value. This is useful for getting the value as of the
1375 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1376 ///
1377 /// # Example
1378 /// ```rust
1379 /// # #[cfg(feature = "deploy")] {
1380 /// # use hydro_lang::prelude::*;
1381 /// # use futures::StreamExt;
1382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1383 /// # let tick = process.tick();
1384 /// # // ticks are lazy by default, forces the second tick to run
1385 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1386 /// # let batch_first_tick = process
1387 /// # .source_iter(q!(vec![]))
1388 /// # .batch(&tick, nondet!(/** test */));
1389 /// # let batch_second_tick = process
1390 /// # .source_iter(q!(vec![1, 2, 3]))
1391 /// # .batch(&tick, nondet!(/** test */))
1392 /// # .defer_tick(); // appears on the second tick
1393 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1394 /// input_batch // first tick: [], second tick: [1, 2, 3]
1395 /// .max()
1396 /// .latest()
1397 /// # .into_singleton()
1398 /// # .sample_eager(nondet!(/** test */))
1399 /// # }, |mut stream| async move {
1400 /// // asynchronously changes from None ~> 3
1401 /// # for w in vec![None, Some(3)] {
1402 /// # assert_eq!(stream.next().await.unwrap(), w);
1403 /// # }
1404 /// # }));
1405 /// # }
1406 /// ```
1407 pub fn latest(self) -> Optional<T, L, Unbounded> {
1408 Optional::new(
1409 self.location.outer().clone(),
1410 HydroNode::YieldConcat {
1411 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1412 metadata: self
1413 .location
1414 .outer()
1415 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1416 },
1417 )
1418 }
1419
1420 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1421 /// be updated with the latest value of the optional inside the tick.
1422 ///
1423 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1424 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1425 /// optional's [`Tick`] context.
1426 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1427 let out_location = Atomic {
1428 tick: self.location.clone(),
1429 };
1430
1431 Optional::new(
1432 out_location.clone(),
1433 HydroNode::YieldConcat {
1434 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435 metadata: out_location
1436 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1437 },
1438 )
1439 }
1440
1441 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1442 /// always has the state of `self` at tick `T - 1`.
1443 ///
1444 /// At tick `0`, the output optional is null, since there is no previous tick.
1445 ///
1446 /// This operator enables stateful iterative processing with ticks, by sending data from one
1447 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1448 ///
1449 /// # Example
1450 /// ```rust
1451 /// # #[cfg(feature = "deploy")] {
1452 /// # use hydro_lang::prelude::*;
1453 /// # use futures::StreamExt;
1454 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1455 /// let tick = process.tick();
1456 /// // ticks are lazy by default, forces the second tick to run
1457 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1458 ///
1459 /// let batch_first_tick = process
1460 /// .source_iter(q!(vec![1, 2]))
1461 /// .batch(&tick, nondet!(/** test */));
1462 /// let batch_second_tick = process
1463 /// .source_iter(q!(vec![3, 4]))
1464 /// .batch(&tick, nondet!(/** test */))
1465 /// .defer_tick(); // appears on the second tick
1466 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1467 /// .reduce(q!(|state, v| *state += v));
1468 ///
1469 /// current_tick_sum.clone().into_singleton().zip(
1470 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1471 /// ).all_ticks()
1472 /// # }, |mut stream| async move {
1473 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1474 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1475 /// # assert_eq!(stream.next().await.unwrap(), w);
1476 /// # }
1477 /// # }));
1478 /// # }
1479 /// ```
1480 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1481 Optional::new(
1482 self.location.clone(),
1483 HydroNode::DeferTick {
1484 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1485 metadata: self.location.new_node_metadata(Self::collection_kind()),
1486 },
1487 )
1488 }
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493 #[cfg(feature = "deploy")]
1494 use futures::StreamExt;
1495 #[cfg(feature = "deploy")]
1496 use hydro_deploy::Deployment;
1497 #[cfg(any(feature = "deploy", feature = "sim"))]
1498 use stageleft::q;
1499
1500 #[cfg(feature = "deploy")]
1501 use super::Optional;
1502 #[cfg(any(feature = "deploy", feature = "sim"))]
1503 use crate::compile::builder::FlowBuilder;
1504 #[cfg(any(feature = "deploy", feature = "sim"))]
1505 use crate::location::Location;
1506 #[cfg(feature = "deploy")]
1507 use crate::nondet::nondet;
1508
1509 #[cfg(feature = "deploy")]
1510 #[tokio::test]
1511 async fn optional_or_cardinality() {
1512 let mut deployment = Deployment::new();
1513
1514 let mut flow = FlowBuilder::new();
1515 let node = flow.process::<()>();
1516 let external = flow.external::<()>();
1517
1518 let node_tick = node.tick();
1519 let tick_singleton = node_tick.singleton(q!(123));
1520 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1521 let counts = tick_optional_inhabited
1522 .clone()
1523 .or(tick_optional_inhabited)
1524 .into_stream()
1525 .count()
1526 .all_ticks()
1527 .send_bincode_external(&external);
1528
1529 let nodes = flow
1530 .with_process(&node, deployment.Localhost())
1531 .with_external(&external, deployment.Localhost())
1532 .deploy(&mut deployment);
1533
1534 deployment.deploy().await.unwrap();
1535
1536 let mut external_out = nodes.connect(counts).await;
1537
1538 deployment.start().await.unwrap();
1539
1540 assert_eq!(external_out.next().await.unwrap(), 1);
1541 }
1542
1543 #[cfg(feature = "deploy")]
1544 #[tokio::test]
1545 async fn into_singleton_top_level_none_cardinality() {
1546 let mut deployment = Deployment::new();
1547
1548 let mut flow = FlowBuilder::new();
1549 let node = flow.process::<()>();
1550 let external = flow.external::<()>();
1551
1552 let node_tick = node.tick();
1553 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1554 let into_singleton = top_level_none.into_singleton();
1555
1556 let tick_driver = node.spin();
1557
1558 let counts = into_singleton
1559 .snapshot(&node_tick, nondet!(/** test */))
1560 .into_stream()
1561 .count()
1562 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1563 .map(q!(|(c, _)| c))
1564 .all_ticks()
1565 .send_bincode_external(&external);
1566
1567 let nodes = flow
1568 .with_process(&node, deployment.Localhost())
1569 .with_external(&external, deployment.Localhost())
1570 .deploy(&mut deployment);
1571
1572 deployment.deploy().await.unwrap();
1573
1574 let mut external_out = nodes.connect(counts).await;
1575
1576 deployment.start().await.unwrap();
1577
1578 assert_eq!(external_out.next().await.unwrap(), 1);
1579 assert_eq!(external_out.next().await.unwrap(), 1);
1580 assert_eq!(external_out.next().await.unwrap(), 1);
1581 }
1582
1583 #[cfg(feature = "deploy")]
1584 #[tokio::test]
1585 async fn into_singleton_unbounded_top_level_none_cardinality() {
1586 let mut deployment = Deployment::new();
1587
1588 let mut flow = FlowBuilder::new();
1589 let node = flow.process::<()>();
1590 let external = flow.external::<()>();
1591
1592 let node_tick = node.tick();
1593 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1594 let into_singleton = top_level_none.into_singleton();
1595
1596 let tick_driver = node.spin();
1597
1598 let counts = into_singleton
1599 .snapshot(&node_tick, nondet!(/** test */))
1600 .into_stream()
1601 .count()
1602 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1603 .map(q!(|(c, _)| c))
1604 .all_ticks()
1605 .send_bincode_external(&external);
1606
1607 let nodes = flow
1608 .with_process(&node, deployment.Localhost())
1609 .with_external(&external, deployment.Localhost())
1610 .deploy(&mut deployment);
1611
1612 deployment.deploy().await.unwrap();
1613
1614 let mut external_out = nodes.connect(counts).await;
1615
1616 deployment.start().await.unwrap();
1617
1618 assert_eq!(external_out.next().await.unwrap(), 1);
1619 assert_eq!(external_out.next().await.unwrap(), 1);
1620 assert_eq!(external_out.next().await.unwrap(), 1);
1621 }
1622
1623 #[cfg(feature = "sim")]
1624 #[test]
1625 fn top_level_optional_some_into_stream_no_replay() {
1626 let mut flow = FlowBuilder::new();
1627 let node = flow.process::<()>();
1628
1629 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1630 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1631 let filtered_some = folded.filter(q!(|_| true));
1632
1633 let out_recv = filtered_some.into_stream().sim_output();
1634
1635 flow.sim().exhaustive(async || {
1636 out_recv.assert_yields_only([10]).await;
1637 });
1638 }
1639
1640 #[cfg(feature = "sim")]
1641 #[test]
1642 fn top_level_optional_none_into_stream_no_replay() {
1643 let mut flow = FlowBuilder::new();
1644 let node = flow.process::<()>();
1645
1646 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1647 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1648 let filtered_none = folded.filter(q!(|_| false));
1649
1650 let out_recv = filtered_none.into_stream().sim_output();
1651
1652 flow.sim().exhaustive(async || {
1653 out_recv.assert_yields_only([] as [i32; 0]).await;
1654 });
1655 }
1656}