hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41 pub(crate) flow_state: FlowState,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
47 fn drop(&mut self) {
48 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
49 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
50 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
51 input: Box::new(ir_node),
52 op_metadata: HydroIrOpMetadata::new(),
53 });
54 }
55 }
56}
57
58impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
59where
60 T: Clone,
61 L: Location<'a> + NoTick,
62{
63 fn from(value: Optional<T, L, Bounded>) -> Self {
64 let tick = value.location().tick();
65 value.clone_into_tick(&tick).latest()
66 }
67}
68
69impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
70where
71 L: Location<'a>,
72{
73 fn defer_tick(self) -> Self {
74 Optional::defer_tick(self)
75 }
76}
77
78impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
79where
80 L: Location<'a>,
81{
82 type Location = Tick<L>;
83
84 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
85 Optional::new(
86 location.clone(),
87 HydroNode::CycleSource {
88 cycle_id,
89 metadata: location.new_node_metadata(Self::collection_kind()),
90 },
91 )
92 }
93}
94
95impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
96where
97 L: Location<'a>,
98{
99 type Location = Tick<L>;
100
101 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
102 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
103 location.clone(),
104 HydroNode::DeferTick {
105 input: Box::new(HydroNode::CycleSource {
106 cycle_id,
107 metadata: location.new_node_metadata(Self::collection_kind()),
108 }),
109 metadata: location
110 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
111 },
112 );
113
114 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
115 }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123 assert_eq!(
124 Location::id(&self.location),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .push_root(HydroRoot::CycleSink {
132 cycle_id,
133 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134 op_metadata: HydroIrOpMetadata::new(),
135 });
136 }
137}
138
139impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
140where
141 L: Location<'a>,
142{
143 type Location = Tick<L>;
144
145 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
146 Optional::new(
147 location.clone(),
148 HydroNode::CycleSource {
149 cycle_id,
150 metadata: location.new_node_metadata(Self::collection_kind()),
151 },
152 )
153 }
154}
155
156impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
157where
158 L: Location<'a>,
159{
160 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161 assert_eq!(
162 Location::id(&self.location),
163 expected_location,
164 "locations do not match"
165 );
166 self.location
167 .flow_state()
168 .borrow_mut()
169 .push_root(HydroRoot::CycleSink {
170 cycle_id,
171 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175}
176
177impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
178where
179 L: Location<'a> + NoTick,
180{
181 type Location = L;
182
183 fn create_source(cycle_id: CycleId, location: L) -> Self {
184 Optional::new(
185 location.clone(),
186 HydroNode::CycleSource {
187 cycle_id,
188 metadata: location.new_node_metadata(Self::collection_kind()),
189 },
190 )
191 }
192}
193
194impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
195where
196 L: Location<'a> + NoTick,
197{
198 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
199 assert_eq!(
200 Location::id(&self.location),
201 expected_location,
202 "locations do not match"
203 );
204 self.location
205 .flow_state()
206 .borrow_mut()
207 .push_root(HydroRoot::CycleSink {
208 cycle_id,
209 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
210 op_metadata: HydroIrOpMetadata::new(),
211 });
212 }
213}
214
215impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
216where
217 L: Location<'a>,
218{
219 fn from(singleton: Singleton<T, L, B>) -> Self {
220 Optional::new(
221 singleton.location.clone(),
222 HydroNode::Cast {
223 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
224 metadata: singleton
225 .location
226 .new_node_metadata(Self::collection_kind()),
227 },
228 )
229 }
230}
231
232#[cfg(stageleft_runtime)]
233pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
234 me: Optional<T, L, B>,
235 other: Optional<O, L, B>,
236) -> Optional<(T, O), L, B> {
237 check_matching_location(&me.location, &other.location);
238
239 Optional::new(
240 me.location.clone(),
241 HydroNode::CrossSingleton {
242 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
243 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
244 metadata: me
245 .location
246 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
247 },
248 )
249}
250
251#[cfg(stageleft_runtime)]
252fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
253 me: Optional<T, L, B>,
254 other: Optional<T, L, B>,
255) -> Optional<T, L, B> {
256 check_matching_location(&me.location, &other.location);
257
258 Optional::new(
259 me.location.clone(),
260 HydroNode::ChainFirst {
261 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
262 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
263 metadata: me
264 .location
265 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
266 },
267 )
268}
269
270impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
271where
272 T: Clone,
273 L: Location<'a>,
274{
275 fn clone(&self) -> Self {
276 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
277 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
278 *self.ir_node.borrow_mut() = HydroNode::Tee {
279 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
280 metadata: self.location.new_node_metadata(Self::collection_kind()),
281 };
282 }
283
284 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
285 Optional {
286 location: self.location.clone(),
287 flow_state: self.flow_state.clone(),
288 ir_node: HydroNode::Tee {
289 inner: SharedNode(inner.0.clone()),
290 metadata: metadata.clone(),
291 }
292 .into(),
293 _phantom: PhantomData,
294 }
295 } else {
296 unreachable!()
297 }
298 }
299}
300
301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
302where
303 L: Location<'a>,
304{
305 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308 let flow_state = location.flow_state().clone();
309 Optional {
310 location,
311 flow_state,
312 ir_node: RefCell::new(ir_node),
313 _phantom: PhantomData,
314 }
315 }
316
317 pub(crate) fn collection_kind() -> CollectionKind {
318 CollectionKind::Optional {
319 bound: B::BOUND_KIND,
320 element_type: stageleft::quote_type::<T>().into(),
321 }
322 }
323
324 /// Returns the [`Location`] where this optional is being materialized.
325 pub fn location(&self) -> &L {
326 &self.location
327 }
328
329 /// Transforms the optional value by applying a function `f` to it,
330 /// continuously as the input is updated.
331 ///
332 /// Whenever the optional is empty, the output optional is also empty.
333 ///
334 /// # Example
335 /// ```rust
336 /// # #[cfg(feature = "deploy")] {
337 /// # use hydro_lang::prelude::*;
338 /// # use futures::StreamExt;
339 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340 /// let tick = process.tick();
341 /// let optional = tick.optional_first_tick(q!(1));
342 /// optional.map(q!(|v| v + 1)).all_ticks()
343 /// # }, |mut stream| async move {
344 /// // 2
345 /// # assert_eq!(stream.next().await.unwrap(), 2);
346 /// # }));
347 /// # }
348 /// ```
349 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
350 where
351 F: Fn(T) -> U + 'a,
352 {
353 let f = f.splice_fn1_ctx(&self.location).into();
354 Optional::new(
355 self.location.clone(),
356 HydroNode::Map {
357 f,
358 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
359 metadata: self
360 .location
361 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
362 },
363 )
364 }
365
366 /// Transforms the optional value by applying a function `f` to it and then flattening
367 /// the result into a stream, preserving the order of elements.
368 ///
369 /// If the optional is empty, the output stream is also empty. If the optional contains
370 /// a value, `f` is applied to produce an iterator, and all items from that iterator
371 /// are emitted in the output stream in deterministic order.
372 ///
373 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
374 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
375 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
376 ///
377 /// # Example
378 /// ```rust
379 /// # #[cfg(feature = "deploy")] {
380 /// # use hydro_lang::prelude::*;
381 /// # use futures::StreamExt;
382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383 /// let tick = process.tick();
384 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
385 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
386 /// # }, |mut stream| async move {
387 /// // 1, 2, 3
388 /// # for w in vec![1, 2, 3] {
389 /// # assert_eq!(stream.next().await.unwrap(), w);
390 /// # }
391 /// # }));
392 /// # }
393 /// ```
394 pub fn flat_map_ordered<U, I, F>(
395 self,
396 f: impl IntoQuotedMut<'a, F, L>,
397 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
398 where
399 I: IntoIterator<Item = U>,
400 F: Fn(T) -> I + 'a,
401 {
402 let f = f.splice_fn1_ctx(&self.location).into();
403 Stream::new(
404 self.location.clone(),
405 HydroNode::FlatMap {
406 f,
407 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408 metadata: self.location.new_node_metadata(
409 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
410 ),
411 },
412 )
413 }
414
415 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
416 /// for the output type `I` to produce items in any order.
417 ///
418 /// If the optional is empty, the output stream is also empty. If the optional contains
419 /// a value, `f` is applied to produce an iterator, and all items from that iterator
420 /// are emitted in the output stream in non-deterministic order.
421 ///
422 /// # Example
423 /// ```rust
424 /// # #[cfg(feature = "deploy")] {
425 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
426 /// # use futures::StreamExt;
427 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
428 /// let tick = process.tick();
429 /// let optional = tick.optional_first_tick(q!(
430 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
431 /// ));
432 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
433 /// # }, |mut stream| async move {
434 /// // 1, 2, 3, but in no particular order
435 /// # let mut results = Vec::new();
436 /// # for _ in 0..3 {
437 /// # results.push(stream.next().await.unwrap());
438 /// # }
439 /// # results.sort();
440 /// # assert_eq!(results, vec![1, 2, 3]);
441 /// # }));
442 /// # }
443 /// ```
444 pub fn flat_map_unordered<U, I, F>(
445 self,
446 f: impl IntoQuotedMut<'a, F, L>,
447 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
448 where
449 I: IntoIterator<Item = U>,
450 F: Fn(T) -> I + 'a,
451 {
452 let f = f.splice_fn1_ctx(&self.location).into();
453 Stream::new(
454 self.location.clone(),
455 HydroNode::FlatMap {
456 f,
457 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458 metadata: self
459 .location
460 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
461 },
462 )
463 }
464
465 /// Flattens the optional value into a stream, preserving the order of elements.
466 ///
467 /// If the optional is empty, the output stream is also empty. If the optional contains
468 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
469 /// in the output stream in deterministic order.
470 ///
471 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
472 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
473 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
474 ///
475 /// # Example
476 /// ```rust
477 /// # #[cfg(feature = "deploy")] {
478 /// # use hydro_lang::prelude::*;
479 /// # use futures::StreamExt;
480 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
481 /// let tick = process.tick();
482 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
483 /// optional.flatten_ordered().all_ticks()
484 /// # }, |mut stream| async move {
485 /// // 1, 2, 3
486 /// # for w in vec![1, 2, 3] {
487 /// # assert_eq!(stream.next().await.unwrap(), w);
488 /// # }
489 /// # }));
490 /// # }
491 /// ```
492 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
493 where
494 T: IntoIterator<Item = U>,
495 {
496 self.flat_map_ordered(q!(|v| v))
497 }
498
499 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
500 /// for the element type `T` to produce items in any order.
501 ///
502 /// If the optional is empty, the output stream is also empty. If the optional contains
503 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
504 /// in the output stream in non-deterministic order.
505 ///
506 /// # Example
507 /// ```rust
508 /// # #[cfg(feature = "deploy")] {
509 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
510 /// # use futures::StreamExt;
511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
512 /// let tick = process.tick();
513 /// let optional = tick.optional_first_tick(q!(
514 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
515 /// ));
516 /// optional.flatten_unordered().all_ticks()
517 /// # }, |mut stream| async move {
518 /// // 1, 2, 3, but in no particular order
519 /// # let mut results = Vec::new();
520 /// # for _ in 0..3 {
521 /// # results.push(stream.next().await.unwrap());
522 /// # }
523 /// # results.sort();
524 /// # assert_eq!(results, vec![1, 2, 3]);
525 /// # }));
526 /// # }
527 /// ```
528 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
529 where
530 T: IntoIterator<Item = U>,
531 {
532 self.flat_map_unordered(q!(|v| v))
533 }
534
535 /// Creates an optional containing only the value if it satisfies a predicate `f`.
536 ///
537 /// If the optional is empty, the output optional is also empty. If the optional contains
538 /// a value and the predicate returns `true`, the output optional contains the same value.
539 /// If the predicate returns `false`, the output optional is empty.
540 ///
541 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
542 /// not modify or take ownership of the value. If you need to modify the value while filtering
543 /// use [`Optional::filter_map`] instead.
544 ///
545 /// # Example
546 /// ```rust
547 /// # #[cfg(feature = "deploy")] {
548 /// # use hydro_lang::prelude::*;
549 /// # use futures::StreamExt;
550 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551 /// let tick = process.tick();
552 /// let optional = tick.optional_first_tick(q!(5));
553 /// optional.filter(q!(|&x| x > 3)).all_ticks()
554 /// # }, |mut stream| async move {
555 /// // 5
556 /// # assert_eq!(stream.next().await.unwrap(), 5);
557 /// # }));
558 /// # }
559 /// ```
560 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
561 where
562 F: Fn(&T) -> bool + 'a,
563 {
564 let f = f.splice_fn1_borrow_ctx(&self.location).into();
565 Optional::new(
566 self.location.clone(),
567 HydroNode::Filter {
568 f,
569 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
570 metadata: self.location.new_node_metadata(Self::collection_kind()),
571 },
572 )
573 }
574
575 /// An operator that both filters and maps. It yields only the value if the supplied
576 /// closure `f` returns `Some(value)`.
577 ///
578 /// If the optional is empty, the output optional is also empty. If the optional contains
579 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
580 /// If the closure returns `None`, the output optional is empty.
581 ///
582 /// # Example
583 /// ```rust
584 /// # #[cfg(feature = "deploy")] {
585 /// # use hydro_lang::prelude::*;
586 /// # use futures::StreamExt;
587 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
588 /// let tick = process.tick();
589 /// let optional = tick.optional_first_tick(q!("42"));
590 /// optional
591 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
592 /// .all_ticks()
593 /// # }, |mut stream| async move {
594 /// // 42
595 /// # assert_eq!(stream.next().await.unwrap(), 42);
596 /// # }));
597 /// # }
598 /// ```
599 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
600 where
601 F: Fn(T) -> Option<U> + 'a,
602 {
603 let f = f.splice_fn1_ctx(&self.location).into();
604 Optional::new(
605 self.location.clone(),
606 HydroNode::FilterMap {
607 f,
608 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609 metadata: self
610 .location
611 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
612 },
613 )
614 }
615
616 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
617 ///
618 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
619 /// non-null. This is useful for combining several pieces of state together.
620 ///
621 /// # Example
622 /// ```rust
623 /// # #[cfg(feature = "deploy")] {
624 /// # use hydro_lang::prelude::*;
625 /// # use futures::StreamExt;
626 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627 /// let tick = process.tick();
628 /// let numbers = process
629 /// .source_iter(q!(vec![123, 456, 789]))
630 /// .batch(&tick, nondet!(/** test */));
631 /// let min = numbers.clone().min(); // Optional
632 /// let max = numbers.max(); // Optional
633 /// min.zip(max).all_ticks()
634 /// # }, |mut stream| async move {
635 /// // [(123, 789)]
636 /// # for w in vec![(123, 789)] {
637 /// # assert_eq!(stream.next().await.unwrap(), w);
638 /// # }
639 /// # }));
640 /// # }
641 /// ```
642 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
643 where
644 B: IsBounded,
645 {
646 let other: Optional<O, L, B> = other.into();
647 check_matching_location(&self.location, &other.location);
648
649 if L::is_top_level()
650 && let Some(tick) = self.location.try_tick()
651 {
652 let out = zip_inside_tick(
653 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
654 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
655 )
656 .latest();
657
658 Optional::new(
659 out.location.clone(),
660 out.ir_node.replace(HydroNode::Placeholder),
661 )
662 } else {
663 zip_inside_tick(self, other)
664 }
665 }
666
667 /// Passes through `self` when it has a value, otherwise passes through `other`.
668 ///
669 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
670 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
671 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
672 ///
673 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
674 /// of the inputs change (including to/from null states).
675 ///
676 /// # Example
677 /// ```rust
678 /// # #[cfg(feature = "deploy")] {
679 /// # use hydro_lang::prelude::*;
680 /// # use futures::StreamExt;
681 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
682 /// let tick = process.tick();
683 /// // ticks are lazy by default, forces the second tick to run
684 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
685 ///
686 /// let some_first_tick = tick.optional_first_tick(q!(123));
687 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
688 /// some_first_tick.or(some_second_tick).all_ticks()
689 /// # }, |mut stream| async move {
690 /// // [123 /* first tick */, 456 /* second tick */]
691 /// # for w in vec![123, 456] {
692 /// # assert_eq!(stream.next().await.unwrap(), w);
693 /// # }
694 /// # }));
695 /// # }
696 /// ```
697 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
698 check_matching_location(&self.location, &other.location);
699
700 if L::is_top_level()
701 && !B::BOUNDED // only if unbounded we need to use a tick
702 && let Some(tick) = self.location.try_tick()
703 {
704 let out = or_inside_tick(
705 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
706 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
707 )
708 .latest();
709
710 Optional::new(
711 out.location.clone(),
712 out.ir_node.replace(HydroNode::Placeholder),
713 )
714 } else {
715 Optional::new(
716 self.location.clone(),
717 HydroNode::ChainFirst {
718 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
719 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
720 metadata: self.location.new_node_metadata(Self::collection_kind()),
721 },
722 )
723 }
724 }
725
726 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
727 ///
728 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
729 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
730 ///
731 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
732 /// of the inputs change (including to/from null states).
733 ///
734 /// # Example
735 /// ```rust
736 /// # #[cfg(feature = "deploy")] {
737 /// # use hydro_lang::prelude::*;
738 /// # use futures::StreamExt;
739 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
740 /// let tick = process.tick();
741 /// // ticks are lazy by default, forces the later ticks to run
742 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
743 ///
744 /// let some_first_tick = tick.optional_first_tick(q!(123));
745 /// some_first_tick
746 /// .unwrap_or(tick.singleton(q!(456)))
747 /// .all_ticks()
748 /// # }, |mut stream| async move {
749 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
750 /// # for w in vec![123, 456, 456, 456] {
751 /// # assert_eq!(stream.next().await.unwrap(), w);
752 /// # }
753 /// # }));
754 /// # }
755 /// ```
756 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
757 let res_option = self.or(other.into());
758 Singleton::new(
759 res_option.location.clone(),
760 HydroNode::Cast {
761 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
762 metadata: res_option
763 .location
764 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
765 },
766 )
767 }
768
769 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
770 ///
771 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
772 /// [`Optional`] when the default value of the type is a suitable fallback.
773 ///
774 /// # Example
775 /// ```rust
776 /// # #[cfg(feature = "deploy")] {
777 /// # use hydro_lang::prelude::*;
778 /// # use futures::StreamExt;
779 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780 /// let tick = process.tick();
781 /// // ticks are lazy by default, forces the later ticks to run
782 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
783 ///
784 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
785 /// some_first_tick.unwrap_or_default().all_ticks()
786 /// # }, |mut stream| async move {
787 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
788 /// # for w in vec![123, 0, 0, 0] {
789 /// # assert_eq!(stream.next().await.unwrap(), w);
790 /// # }
791 /// # }));
792 /// # }
793 /// ```
794 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
795 where
796 T: Default + Clone,
797 {
798 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
799 }
800
801 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
802 ///
803 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
804 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
805 /// so that Hydro can skip any computation on null values.
806 ///
807 /// # Example
808 /// ```rust
809 /// # #[cfg(feature = "deploy")] {
810 /// # use hydro_lang::prelude::*;
811 /// # use futures::StreamExt;
812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813 /// let tick = process.tick();
814 /// // ticks are lazy by default, forces the later ticks to run
815 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
816 ///
817 /// let some_first_tick = tick.optional_first_tick(q!(123));
818 /// some_first_tick.into_singleton().all_ticks()
819 /// # }, |mut stream| async move {
820 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
821 /// # for w in vec![Some(123), None, None, None] {
822 /// # assert_eq!(stream.next().await.unwrap(), w);
823 /// # }
824 /// # }));
825 /// # }
826 /// ```
827 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
828 where
829 T: Clone,
830 {
831 let none: syn::Expr = parse_quote!(::std::option::Option::None);
832
833 let none_singleton = Singleton::new(
834 self.location.clone(),
835 HydroNode::SingletonSource {
836 value: none.into(),
837 first_tick_only: false,
838 metadata: self
839 .location
840 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
841 },
842 );
843
844 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
845 }
846
847 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
848 ///
849 /// # Example
850 /// ```rust
851 /// # #[cfg(feature = "deploy")] {
852 /// # use hydro_lang::prelude::*;
853 /// # use futures::StreamExt;
854 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
855 /// let tick = process.tick();
856 /// // ticks are lazy by default, forces the second tick to run
857 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
858 ///
859 /// let some_first_tick = tick.optional_first_tick(q!(42));
860 /// some_first_tick.is_some().all_ticks()
861 /// # }, |mut stream| async move {
862 /// // [true /* first tick */, false /* second tick */, ...]
863 /// # for w in vec![true, false] {
864 /// # assert_eq!(stream.next().await.unwrap(), w);
865 /// # }
866 /// # }));
867 /// # }
868 /// ```
869 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
870 pub fn is_some(self) -> Singleton<bool, L, B> {
871 self.map(q!(|_| ()))
872 .into_singleton()
873 .map(q!(|o| o.is_some()))
874 }
875
876 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// let tick = process.tick();
885 /// // ticks are lazy by default, forces the second tick to run
886 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
887 ///
888 /// let some_first_tick = tick.optional_first_tick(q!(42));
889 /// some_first_tick.is_none().all_ticks()
890 /// # }, |mut stream| async move {
891 /// // [false /* first tick */, true /* second tick */, ...]
892 /// # for w in vec![false, true] {
893 /// # assert_eq!(stream.next().await.unwrap(), w);
894 /// # }
895 /// # }));
896 /// # }
897 /// ```
898 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
899 pub fn is_none(self) -> Singleton<bool, L, B> {
900 self.map(q!(|_| ()))
901 .into_singleton()
902 .map(q!(|o| o.is_none()))
903 }
904
905 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
906 /// values are equal, `false` otherwise (including when either is null).
907 ///
908 /// # Example
909 /// ```rust
910 /// # #[cfg(feature = "deploy")] {
911 /// # use hydro_lang::prelude::*;
912 /// # use futures::StreamExt;
913 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
914 /// let tick = process.tick();
915 /// // ticks are lazy by default, forces the second tick to run
916 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
917 ///
918 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
919 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
920 /// a.is_some_and_equals(b).all_ticks()
921 /// # }, |mut stream| async move {
922 /// // [true, false]
923 /// # for w in vec![true, false] {
924 /// # assert_eq!(stream.next().await.unwrap(), w);
925 /// # }
926 /// # }));
927 /// # }
928 /// ```
929 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
930 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
931 where
932 T: PartialEq + Clone,
933 B: IsBounded,
934 {
935 self.into_singleton()
936 .zip(other.into_singleton())
937 .map(q!(|(a, b)| a.is_some() && a == b))
938 }
939
940 /// An operator which allows you to "name" a `HydroNode`.
941 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
942 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
943 {
944 let mut node = self.ir_node.borrow_mut();
945 let metadata = node.metadata_mut();
946 metadata.tag = Some(name.to_owned());
947 }
948 self
949 }
950
951 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
952 /// implies that `B == Bounded`.
953 pub fn make_bounded(self) -> Optional<T, L, Bounded>
954 where
955 B: IsBounded,
956 {
957 Optional::new(
958 self.location.clone(),
959 self.ir_node.replace(HydroNode::Placeholder),
960 )
961 }
962
963 /// Clones this bounded optional into a tick, returning a optional that has the
964 /// same value as the outer optional. Because the outer optional is bounded, this
965 /// is deterministic because there is only a single immutable version.
966 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
967 where
968 B: IsBounded,
969 T: Clone,
970 {
971 // TODO(shadaj): avoid printing simulator logs for this snapshot
972 self.snapshot(
973 tick,
974 nondet!(/** bounded top-level optional so deterministic */),
975 )
976 }
977
978 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
979 /// non-null. Otherwise, the stream is empty.
980 ///
981 /// # Example
982 /// ```rust
983 /// # #[cfg(feature = "deploy")] {
984 /// # use hydro_lang::prelude::*;
985 /// # use futures::StreamExt;
986 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
987 /// # let tick = process.tick();
988 /// # // ticks are lazy by default, forces the second tick to run
989 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
990 /// # let batch_first_tick = process
991 /// # .source_iter(q!(vec![]))
992 /// # .batch(&tick, nondet!(/** test */));
993 /// # let batch_second_tick = process
994 /// # .source_iter(q!(vec![123, 456]))
995 /// # .batch(&tick, nondet!(/** test */))
996 /// # .defer_tick(); // appears on the second tick
997 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
998 /// input_batch // first tick: [], second tick: [123, 456]
999 /// .clone()
1000 /// .max()
1001 /// .into_stream()
1002 /// .chain(input_batch)
1003 /// .all_ticks()
1004 /// # }, |mut stream| async move {
1005 /// // [456, 123, 456]
1006 /// # for w in vec![456, 123, 456] {
1007 /// # assert_eq!(stream.next().await.unwrap(), w);
1008 /// # }
1009 /// # }));
1010 /// # }
1011 /// ```
1012 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1013 where
1014 B: IsBounded,
1015 {
1016 Stream::new(
1017 self.location.clone(),
1018 HydroNode::Cast {
1019 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1020 metadata: self.location.new_node_metadata(Stream::<
1021 T,
1022 Tick<L>,
1023 Bounded,
1024 TotalOrder,
1025 ExactlyOnce,
1026 >::collection_kind()),
1027 },
1028 )
1029 }
1030
1031 /// Filters this optional, passing through the value if the boolean signal is `true`,
1032 /// otherwise the output is null.
1033 ///
1034 /// # Example
1035 /// ```rust
1036 /// # #[cfg(feature = "deploy")] {
1037 /// # use hydro_lang::prelude::*;
1038 /// # use futures::StreamExt;
1039 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1040 /// let tick = process.tick();
1041 /// // ticks are lazy by default, forces the second tick to run
1042 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1043 ///
1044 /// let some_first_tick = tick.optional_first_tick(q!(()));
1045 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1046 /// let batch_first_tick = process
1047 /// .source_iter(q!(vec![456]))
1048 /// .batch(&tick, nondet!(/** test */));
1049 /// let batch_second_tick = process
1050 /// .source_iter(q!(vec![789]))
1051 /// .batch(&tick, nondet!(/** test */))
1052 /// .defer_tick();
1053 /// batch_first_tick.chain(batch_second_tick).first()
1054 /// .filter_if(signal)
1055 /// .unwrap_or(tick.singleton(q!(0)))
1056 /// .all_ticks()
1057 /// # }, |mut stream| async move {
1058 /// // [456, 0]
1059 /// # for w in vec![456, 0] {
1060 /// # assert_eq!(stream.next().await.unwrap(), w);
1061 /// # }
1062 /// # }));
1063 /// # }
1064 /// ```
1065 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1066 where
1067 B: IsBounded,
1068 {
1069 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1070 }
1071
1072 /// Filters this optional, passing through the optional value if it is non-null **and** the
1073 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1074 ///
1075 /// Useful for conditionally processing, such as only emitting an optional's value outside
1076 /// a tick if some other condition is satisfied.
1077 ///
1078 /// # Example
1079 /// ```rust
1080 /// # #[cfg(feature = "deploy")] {
1081 /// # use hydro_lang::prelude::*;
1082 /// # use futures::StreamExt;
1083 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1084 /// let tick = process.tick();
1085 /// // ticks are lazy by default, forces the second tick to run
1086 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1087 ///
1088 /// let batch_first_tick = process
1089 /// .source_iter(q!(vec![]))
1090 /// .batch(&tick, nondet!(/** test */));
1091 /// let batch_second_tick = process
1092 /// .source_iter(q!(vec![456]))
1093 /// .batch(&tick, nondet!(/** test */))
1094 /// .defer_tick(); // appears on the second tick
1095 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1096 /// batch_first_tick.chain(batch_second_tick).first()
1097 /// .filter_if_some(some_on_first_tick)
1098 /// .unwrap_or(tick.singleton(q!(789)))
1099 /// .all_ticks()
1100 /// # }, |mut stream| async move {
1101 /// // [789, 789]
1102 /// # for w in vec![789, 789] {
1103 /// # assert_eq!(stream.next().await.unwrap(), w);
1104 /// # }
1105 /// # }));
1106 /// # }
1107 /// ```
1108 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1109 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1110 where
1111 B: IsBounded,
1112 {
1113 self.filter_if(signal.is_some())
1114 }
1115
1116 /// Filters this optional, passing through the optional value if it is non-null **and** the
1117 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1118 ///
1119 /// Useful for conditionally processing, such as only emitting an optional's value outside
1120 /// a tick if some other condition is satisfied.
1121 ///
1122 /// # Example
1123 /// ```rust
1124 /// # #[cfg(feature = "deploy")] {
1125 /// # use hydro_lang::prelude::*;
1126 /// # use futures::StreamExt;
1127 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1128 /// let tick = process.tick();
1129 /// // ticks are lazy by default, forces the second tick to run
1130 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1131 ///
1132 /// let batch_first_tick = process
1133 /// .source_iter(q!(vec![]))
1134 /// .batch(&tick, nondet!(/** test */));
1135 /// let batch_second_tick = process
1136 /// .source_iter(q!(vec![456]))
1137 /// .batch(&tick, nondet!(/** test */))
1138 /// .defer_tick(); // appears on the second tick
1139 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1140 /// batch_first_tick.chain(batch_second_tick).first()
1141 /// .filter_if_none(some_on_first_tick)
1142 /// .unwrap_or(tick.singleton(q!(789)))
1143 /// .all_ticks()
1144 /// # }, |mut stream| async move {
1145 /// // [789, 789]
1146 /// # for w in vec![789, 456] {
1147 /// # assert_eq!(stream.next().await.unwrap(), w);
1148 /// # }
1149 /// # }));
1150 /// # }
1151 /// ```
1152 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1153 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1154 where
1155 B: IsBounded,
1156 {
1157 self.filter_if(other.is_none())
1158 }
1159
1160 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1161 ///
1162 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1163 /// having a value, such as only releasing a piece of state if the node is the leader.
1164 ///
1165 /// # Example
1166 /// ```rust
1167 /// # #[cfg(feature = "deploy")] {
1168 /// # use hydro_lang::prelude::*;
1169 /// # use futures::StreamExt;
1170 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171 /// let tick = process.tick();
1172 /// // ticks are lazy by default, forces the second tick to run
1173 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1174 ///
1175 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1176 /// some_on_first_tick
1177 /// .if_some_then(tick.singleton(q!(456)))
1178 /// .unwrap_or(tick.singleton(q!(123)))
1179 /// # .all_ticks()
1180 /// # }, |mut stream| async move {
1181 /// // 456 (first tick) ~> 123 (second tick onwards)
1182 /// # for w in vec![456, 123, 123] {
1183 /// # assert_eq!(stream.next().await.unwrap(), w);
1184 /// # }
1185 /// # }));
1186 /// # }
1187 /// ```
1188 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1189 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1190 where
1191 B: IsBounded,
1192 {
1193 value.filter_if(self.is_some())
1194 }
1195}
1196
1197impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1198where
1199 L: Location<'a> + NoTick,
1200{
1201 /// Returns an optional value corresponding to the latest snapshot of the optional
1202 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1203 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1204 /// all snapshots of this optional into the atomic-associated tick will observe the
1205 /// same value each tick.
1206 ///
1207 /// # Non-Determinism
1208 /// Because this picks a snapshot of a optional whose value is continuously changing,
1209 /// the output optional has a non-deterministic value since the snapshot can be at an
1210 /// arbitrary point in time.
1211 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1212 Optional::new(
1213 self.location.clone().tick,
1214 HydroNode::Batch {
1215 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1216 metadata: self
1217 .location
1218 .tick
1219 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1220 },
1221 )
1222 }
1223
1224 /// Returns this optional back into a top-level, asynchronous execution context where updates
1225 /// to the value will be asynchronously propagated.
1226 pub fn end_atomic(self) -> Optional<T, L, B> {
1227 Optional::new(
1228 self.location.tick.l.clone(),
1229 HydroNode::EndAtomic {
1230 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1231 metadata: self
1232 .location
1233 .tick
1234 .l
1235 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1236 },
1237 )
1238 }
1239}
1240
1241impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1242where
1243 L: Location<'a>,
1244{
1245 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1246 /// will observe the same version of the value and will be executed synchronously before any
1247 /// outputs are yielded (in [`Optional::end_atomic`]).
1248 ///
1249 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1250 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1251 /// a different version).
1252 ///
1253 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1254 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1255 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1256 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1257 let out_location = Atomic { tick: tick.clone() };
1258 Optional::new(
1259 out_location.clone(),
1260 HydroNode::BeginAtomic {
1261 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1262 metadata: out_location
1263 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1264 },
1265 )
1266 }
1267
1268 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1269 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1270 /// relevant data that contributed to the snapshot at tick `t`.
1271 ///
1272 /// # Non-Determinism
1273 /// Because this picks a snapshot of a optional whose value is continuously changing,
1274 /// the output optional has a non-deterministic value since the snapshot can be at an
1275 /// arbitrary point in time.
1276 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1277 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1278 Optional::new(
1279 tick.clone(),
1280 HydroNode::Batch {
1281 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1282 metadata: tick
1283 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1284 },
1285 )
1286 }
1287
1288 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1289 /// with order corresponding to increasing prefixes of data contributing to the optional.
1290 ///
1291 /// # Non-Determinism
1292 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1293 /// to non-deterministic batching and arrival of inputs, the output stream is
1294 /// non-deterministic.
1295 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1296 where
1297 L: NoTick,
1298 {
1299 let tick = self.location.tick();
1300 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1301 }
1302
1303 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1304 /// value taken at various points in time. Because the input optional may be
1305 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1306 /// represent the value of the optional given some prefix of the streams leading up to
1307 /// it.
1308 ///
1309 /// # Non-Determinism
1310 /// The output stream is non-deterministic in which elements are sampled, since this
1311 /// is controlled by a clock.
1312 pub fn sample_every(
1313 self,
1314 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1315 nondet: NonDet,
1316 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1317 where
1318 L: NoTick + NoAtomic,
1319 {
1320 let samples = self.location.source_interval(interval, nondet);
1321 let tick = self.location.tick();
1322
1323 self.snapshot(&tick, nondet)
1324 .filter_if(samples.batch(&tick, nondet).first().is_some())
1325 .all_ticks()
1326 .weaken_retries()
1327 }
1328}
1329
1330impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1331where
1332 L: Location<'a>,
1333{
1334 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1335 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1336 /// null values).
1337 ///
1338 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1339 /// producing one element in the output for each (non-null) tick. This is useful for batched
1340 /// computations, where the results from each tick must be combined together.
1341 ///
1342 /// # Example
1343 /// ```rust
1344 /// # #[cfg(feature = "deploy")] {
1345 /// # use hydro_lang::prelude::*;
1346 /// # use futures::StreamExt;
1347 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1348 /// # let tick = process.tick();
1349 /// # // ticks are lazy by default, forces the second tick to run
1350 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1351 /// # let batch_first_tick = process
1352 /// # .source_iter(q!(vec![]))
1353 /// # .batch(&tick, nondet!(/** test */));
1354 /// # let batch_second_tick = process
1355 /// # .source_iter(q!(vec![1, 2, 3]))
1356 /// # .batch(&tick, nondet!(/** test */))
1357 /// # .defer_tick(); // appears on the second tick
1358 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1359 /// input_batch // first tick: [], second tick: [1, 2, 3]
1360 /// .max()
1361 /// .all_ticks()
1362 /// # }, |mut stream| async move {
1363 /// // [3]
1364 /// # for w in vec![3] {
1365 /// # assert_eq!(stream.next().await.unwrap(), w);
1366 /// # }
1367 /// # }));
1368 /// # }
1369 /// ```
1370 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1371 self.into_stream().all_ticks()
1372 }
1373
1374 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1375 /// which will stream the value computed in _each_ tick as a separate stream element.
1376 ///
1377 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1378 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1379 /// optional's [`Tick`] context.
1380 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1381 self.into_stream().all_ticks_atomic()
1382 }
1383
1384 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1385 /// be asynchronously updated with the latest value of the optional inside the tick, including
1386 /// whether the optional is null or not.
1387 ///
1388 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1389 /// tick that tracks the inner value. This is useful for getting the value as of the
1390 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1391 ///
1392 /// # Example
1393 /// ```rust
1394 /// # #[cfg(feature = "deploy")] {
1395 /// # use hydro_lang::prelude::*;
1396 /// # use futures::StreamExt;
1397 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1398 /// # let tick = process.tick();
1399 /// # // ticks are lazy by default, forces the second tick to run
1400 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1401 /// # let batch_first_tick = process
1402 /// # .source_iter(q!(vec![]))
1403 /// # .batch(&tick, nondet!(/** test */));
1404 /// # let batch_second_tick = process
1405 /// # .source_iter(q!(vec![1, 2, 3]))
1406 /// # .batch(&tick, nondet!(/** test */))
1407 /// # .defer_tick(); // appears on the second tick
1408 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1409 /// input_batch // first tick: [], second tick: [1, 2, 3]
1410 /// .max()
1411 /// .latest()
1412 /// # .into_singleton()
1413 /// # .sample_eager(nondet!(/** test */))
1414 /// # }, |mut stream| async move {
1415 /// // asynchronously changes from None ~> 3
1416 /// # for w in vec![None, Some(3)] {
1417 /// # assert_eq!(stream.next().await.unwrap(), w);
1418 /// # }
1419 /// # }));
1420 /// # }
1421 /// ```
1422 pub fn latest(self) -> Optional<T, L, Unbounded> {
1423 Optional::new(
1424 self.location.outer().clone(),
1425 HydroNode::YieldConcat {
1426 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1427 metadata: self
1428 .location
1429 .outer()
1430 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1431 },
1432 )
1433 }
1434
1435 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1436 /// be updated with the latest value of the optional inside the tick.
1437 ///
1438 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1439 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1440 /// optional's [`Tick`] context.
1441 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1442 let out_location = Atomic {
1443 tick: self.location.clone(),
1444 };
1445
1446 Optional::new(
1447 out_location.clone(),
1448 HydroNode::YieldConcat {
1449 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1450 metadata: out_location
1451 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1452 },
1453 )
1454 }
1455
1456 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1457 /// always has the state of `self` at tick `T - 1`.
1458 ///
1459 /// At tick `0`, the output optional is null, since there is no previous tick.
1460 ///
1461 /// This operator enables stateful iterative processing with ticks, by sending data from one
1462 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1463 ///
1464 /// # Example
1465 /// ```rust
1466 /// # #[cfg(feature = "deploy")] {
1467 /// # use hydro_lang::prelude::*;
1468 /// # use futures::StreamExt;
1469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1470 /// let tick = process.tick();
1471 /// // ticks are lazy by default, forces the second tick to run
1472 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1473 ///
1474 /// let batch_first_tick = process
1475 /// .source_iter(q!(vec![1, 2]))
1476 /// .batch(&tick, nondet!(/** test */));
1477 /// let batch_second_tick = process
1478 /// .source_iter(q!(vec![3, 4]))
1479 /// .batch(&tick, nondet!(/** test */))
1480 /// .defer_tick(); // appears on the second tick
1481 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1482 /// .reduce(q!(|state, v| *state += v));
1483 ///
1484 /// current_tick_sum.clone().into_singleton().zip(
1485 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1486 /// ).all_ticks()
1487 /// # }, |mut stream| async move {
1488 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1489 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1490 /// # assert_eq!(stream.next().await.unwrap(), w);
1491 /// # }
1492 /// # }));
1493 /// # }
1494 /// ```
1495 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1496 Optional::new(
1497 self.location.clone(),
1498 HydroNode::DeferTick {
1499 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1500 metadata: self.location.new_node_metadata(Self::collection_kind()),
1501 },
1502 )
1503 }
1504}
1505
1506#[cfg(test)]
1507mod tests {
1508 #[cfg(feature = "deploy")]
1509 use futures::StreamExt;
1510 #[cfg(feature = "deploy")]
1511 use hydro_deploy::Deployment;
1512 #[cfg(any(feature = "deploy", feature = "sim"))]
1513 use stageleft::q;
1514
1515 #[cfg(feature = "deploy")]
1516 use super::Optional;
1517 #[cfg(any(feature = "deploy", feature = "sim"))]
1518 use crate::compile::builder::FlowBuilder;
1519 #[cfg(any(feature = "deploy", feature = "sim"))]
1520 use crate::location::Location;
1521 #[cfg(feature = "deploy")]
1522 use crate::nondet::nondet;
1523
1524 #[cfg(feature = "deploy")]
1525 #[tokio::test]
1526 async fn optional_or_cardinality() {
1527 let mut deployment = Deployment::new();
1528
1529 let mut flow = FlowBuilder::new();
1530 let node = flow.process::<()>();
1531 let external = flow.external::<()>();
1532
1533 let node_tick = node.tick();
1534 let tick_singleton = node_tick.singleton(q!(123));
1535 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1536 let counts = tick_optional_inhabited
1537 .clone()
1538 .or(tick_optional_inhabited)
1539 .into_stream()
1540 .count()
1541 .all_ticks()
1542 .send_bincode_external(&external);
1543
1544 let nodes = flow
1545 .with_process(&node, deployment.Localhost())
1546 .with_external(&external, deployment.Localhost())
1547 .deploy(&mut deployment);
1548
1549 deployment.deploy().await.unwrap();
1550
1551 let mut external_out = nodes.connect(counts).await;
1552
1553 deployment.start().await.unwrap();
1554
1555 assert_eq!(external_out.next().await.unwrap(), 1);
1556 }
1557
1558 #[cfg(feature = "deploy")]
1559 #[tokio::test]
1560 async fn into_singleton_top_level_none_cardinality() {
1561 let mut deployment = Deployment::new();
1562
1563 let mut flow = FlowBuilder::new();
1564 let node = flow.process::<()>();
1565 let external = flow.external::<()>();
1566
1567 let node_tick = node.tick();
1568 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1569 let into_singleton = top_level_none.into_singleton();
1570
1571 let tick_driver = node.spin();
1572
1573 let counts = into_singleton
1574 .snapshot(&node_tick, nondet!(/** test */))
1575 .into_stream()
1576 .count()
1577 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1578 .map(q!(|(c, _)| c))
1579 .all_ticks()
1580 .send_bincode_external(&external);
1581
1582 let nodes = flow
1583 .with_process(&node, deployment.Localhost())
1584 .with_external(&external, deployment.Localhost())
1585 .deploy(&mut deployment);
1586
1587 deployment.deploy().await.unwrap();
1588
1589 let mut external_out = nodes.connect(counts).await;
1590
1591 deployment.start().await.unwrap();
1592
1593 assert_eq!(external_out.next().await.unwrap(), 1);
1594 assert_eq!(external_out.next().await.unwrap(), 1);
1595 assert_eq!(external_out.next().await.unwrap(), 1);
1596 }
1597
1598 #[cfg(feature = "deploy")]
1599 #[tokio::test]
1600 async fn into_singleton_unbounded_top_level_none_cardinality() {
1601 let mut deployment = Deployment::new();
1602
1603 let mut flow = FlowBuilder::new();
1604 let node = flow.process::<()>();
1605 let external = flow.external::<()>();
1606
1607 let node_tick = node.tick();
1608 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1609 let into_singleton = top_level_none.into_singleton();
1610
1611 let tick_driver = node.spin();
1612
1613 let counts = into_singleton
1614 .snapshot(&node_tick, nondet!(/** test */))
1615 .into_stream()
1616 .count()
1617 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1618 .map(q!(|(c, _)| c))
1619 .all_ticks()
1620 .send_bincode_external(&external);
1621
1622 let nodes = flow
1623 .with_process(&node, deployment.Localhost())
1624 .with_external(&external, deployment.Localhost())
1625 .deploy(&mut deployment);
1626
1627 deployment.deploy().await.unwrap();
1628
1629 let mut external_out = nodes.connect(counts).await;
1630
1631 deployment.start().await.unwrap();
1632
1633 assert_eq!(external_out.next().await.unwrap(), 1);
1634 assert_eq!(external_out.next().await.unwrap(), 1);
1635 assert_eq!(external_out.next().await.unwrap(), 1);
1636 }
1637
1638 #[cfg(feature = "sim")]
1639 #[test]
1640 fn top_level_optional_some_into_stream_no_replay() {
1641 let mut flow = FlowBuilder::new();
1642 let node = flow.process::<()>();
1643
1644 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1645 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1646 let filtered_some = folded.filter(q!(|_| true));
1647
1648 let out_recv = filtered_some.into_stream().sim_output();
1649
1650 flow.sim().exhaustive(async || {
1651 out_recv.assert_yields_only([10]).await;
1652 });
1653 }
1654
1655 #[cfg(feature = "sim")]
1656 #[test]
1657 fn top_level_optional_none_into_stream_no_replay() {
1658 let mut flow = FlowBuilder::new();
1659 let node = flow.process::<()>();
1660
1661 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1662 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1663 let filtered_none = folded.filter(q!(|_| false));
1664
1665 let out_recv = filtered_none.into_stream().sim_output();
1666
1667 flow.sim().exhaustive(async || {
1668 out_recv.assert_yields_only([] as [i32; 0]).await;
1669 });
1670 }
1671}