hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
45where
46 T: Clone,
47 L: Location<'a> + NoTick,
48{
49 fn from(value: Optional<T, L, Bounded>) -> Self {
50 let tick = value.location().tick();
51 value.clone_into_tick(&tick).latest()
52 }
53}
54
55impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
56where
57 L: Location<'a>,
58{
59 fn defer_tick(self) -> Self {
60 Optional::defer_tick(self)
61 }
62}
63
64impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
65where
66 L: Location<'a>,
67{
68 type Location = Tick<L>;
69
70 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
71 Optional::new(
72 location.clone(),
73 HydroNode::CycleSource {
74 ident,
75 metadata: location.new_node_metadata(Self::collection_kind()),
76 },
77 )
78 }
79}
80
81impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
82where
83 L: Location<'a>,
84{
85 type Location = Tick<L>;
86
87 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
88 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
89 location.clone(),
90 HydroNode::DeferTick {
91 input: Box::new(HydroNode::CycleSource {
92 ident,
93 metadata: location.new_node_metadata(Self::collection_kind()),
94 }),
95 metadata: location
96 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
97 },
98 );
99
100 from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
101 }
102}
103
104impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
105where
106 L: Location<'a>,
107{
108 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
109 assert_eq!(
110 Location::id(&self.location),
111 expected_location,
112 "locations do not match"
113 );
114 self.location
115 .flow_state()
116 .borrow_mut()
117 .push_root(HydroRoot::CycleSink {
118 ident,
119 input: Box::new(self.ir_node.into_inner()),
120 op_metadata: HydroIrOpMetadata::new(),
121 });
122 }
123}
124
125impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
126where
127 L: Location<'a>,
128{
129 type Location = Tick<L>;
130
131 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
132 Optional::new(
133 location.clone(),
134 HydroNode::CycleSource {
135 ident,
136 metadata: location.new_node_metadata(Self::collection_kind()),
137 },
138 )
139 }
140}
141
142impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
143where
144 L: Location<'a>,
145{
146 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
147 assert_eq!(
148 Location::id(&self.location),
149 expected_location,
150 "locations do not match"
151 );
152 self.location
153 .flow_state()
154 .borrow_mut()
155 .push_root(HydroRoot::CycleSink {
156 ident,
157 input: Box::new(self.ir_node.into_inner()),
158 op_metadata: HydroIrOpMetadata::new(),
159 });
160 }
161}
162
163impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
164where
165 L: Location<'a> + NoTick,
166{
167 type Location = L;
168
169 fn create_source(ident: syn::Ident, location: L) -> Self {
170 Optional::new(
171 location.clone(),
172 HydroNode::CycleSource {
173 ident,
174 metadata: location.new_node_metadata(Self::collection_kind()),
175 },
176 )
177 }
178}
179
180impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
181where
182 L: Location<'a> + NoTick,
183{
184 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
185 assert_eq!(
186 Location::id(&self.location),
187 expected_location,
188 "locations do not match"
189 );
190 self.location
191 .flow_state()
192 .borrow_mut()
193 .push_root(HydroRoot::CycleSink {
194 ident,
195 input: Box::new(self.ir_node.into_inner()),
196 op_metadata: HydroIrOpMetadata::new(),
197 });
198 }
199}
200
201impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
202where
203 L: Location<'a>,
204{
205 fn from(singleton: Singleton<T, L, B>) -> Self {
206 Optional::new(
207 singleton.location.clone(),
208 HydroNode::Cast {
209 inner: Box::new(singleton.ir_node.into_inner()),
210 metadata: singleton
211 .location
212 .new_node_metadata(Self::collection_kind()),
213 },
214 )
215 }
216}
217
218#[cfg(stageleft_runtime)]
219fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
220 me: Optional<T, L, B>,
221 other: Optional<O, L, B>,
222) -> Optional<(T, O), L, B> {
223 check_matching_location(&me.location, &other.location);
224
225 Optional::new(
226 me.location.clone(),
227 HydroNode::CrossSingleton {
228 left: Box::new(me.ir_node.into_inner()),
229 right: Box::new(other.ir_node.into_inner()),
230 metadata: me
231 .location
232 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
233 },
234 )
235}
236
237#[cfg(stageleft_runtime)]
238fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
239 me: Optional<T, L, B>,
240 other: Optional<T, L, B>,
241) -> Optional<T, L, B> {
242 check_matching_location(&me.location, &other.location);
243
244 Optional::new(
245 me.location.clone(),
246 HydroNode::ChainFirst {
247 first: Box::new(me.ir_node.into_inner()),
248 second: Box::new(other.ir_node.into_inner()),
249 metadata: me
250 .location
251 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
252 },
253 )
254}
255
256impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
257where
258 T: Clone,
259 L: Location<'a>,
260{
261 fn clone(&self) -> Self {
262 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
263 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
264 *self.ir_node.borrow_mut() = HydroNode::Tee {
265 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
266 metadata: self.location.new_node_metadata(Self::collection_kind()),
267 };
268 }
269
270 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
271 Optional {
272 location: self.location.clone(),
273 ir_node: HydroNode::Tee {
274 inner: TeeNode(inner.0.clone()),
275 metadata: metadata.clone(),
276 }
277 .into(),
278 _phantom: PhantomData,
279 }
280 } else {
281 unreachable!()
282 }
283 }
284}
285
286impl<'a, T, L, B: Boundedness> Optional<T, L, B>
287where
288 L: Location<'a>,
289{
290 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
291 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
292 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
293 Optional {
294 location,
295 ir_node: RefCell::new(ir_node),
296 _phantom: PhantomData,
297 }
298 }
299
300 pub(crate) fn collection_kind() -> CollectionKind {
301 CollectionKind::Optional {
302 bound: B::BOUND_KIND,
303 element_type: stageleft::quote_type::<T>().into(),
304 }
305 }
306
307 /// Returns the [`Location`] where this optional is being materialized.
308 pub fn location(&self) -> &L {
309 &self.location
310 }
311
312 /// Transforms the optional value by applying a function `f` to it,
313 /// continuously as the input is updated.
314 ///
315 /// Whenever the optional is empty, the output optional is also empty.
316 ///
317 /// # Example
318 /// ```rust
319 /// # #[cfg(feature = "deploy")] {
320 /// # use hydro_lang::prelude::*;
321 /// # use futures::StreamExt;
322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
323 /// let tick = process.tick();
324 /// let optional = tick.optional_first_tick(q!(1));
325 /// optional.map(q!(|v| v + 1)).all_ticks()
326 /// # }, |mut stream| async move {
327 /// // 2
328 /// # assert_eq!(stream.next().await.unwrap(), 2);
329 /// # }));
330 /// # }
331 /// ```
332 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
333 where
334 F: Fn(T) -> U + 'a,
335 {
336 let f = f.splice_fn1_ctx(&self.location).into();
337 Optional::new(
338 self.location.clone(),
339 HydroNode::Map {
340 f,
341 input: Box::new(self.ir_node.into_inner()),
342 metadata: self
343 .location
344 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
345 },
346 )
347 }
348
349 /// Transforms the optional value by applying a function `f` to it and then flattening
350 /// the result into a stream, preserving the order of elements.
351 ///
352 /// If the optional is empty, the output stream is also empty. If the optional contains
353 /// a value, `f` is applied to produce an iterator, and all items from that iterator
354 /// are emitted in the output stream in deterministic order.
355 ///
356 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
357 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
358 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
359 ///
360 /// # Example
361 /// ```rust
362 /// # #[cfg(feature = "deploy")] {
363 /// # use hydro_lang::prelude::*;
364 /// # use futures::StreamExt;
365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
366 /// let tick = process.tick();
367 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
368 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
369 /// # }, |mut stream| async move {
370 /// // 1, 2, 3
371 /// # for w in vec![1, 2, 3] {
372 /// # assert_eq!(stream.next().await.unwrap(), w);
373 /// # }
374 /// # }));
375 /// # }
376 /// ```
377 pub fn flat_map_ordered<U, I, F>(
378 self,
379 f: impl IntoQuotedMut<'a, F, L>,
380 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
381 where
382 I: IntoIterator<Item = U>,
383 F: Fn(T) -> I + 'a,
384 {
385 let f = f.splice_fn1_ctx(&self.location).into();
386 Stream::new(
387 self.location.clone(),
388 HydroNode::FlatMap {
389 f,
390 input: Box::new(self.ir_node.into_inner()),
391 metadata: self.location.new_node_metadata(
392 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
393 ),
394 },
395 )
396 }
397
398 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
399 /// for the output type `I` to produce items in any order.
400 ///
401 /// If the optional is empty, the output stream is also empty. If the optional contains
402 /// a value, `f` is applied to produce an iterator, and all items from that iterator
403 /// are emitted in the output stream in non-deterministic order.
404 ///
405 /// # Example
406 /// ```rust
407 /// # #[cfg(feature = "deploy")] {
408 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
409 /// # use futures::StreamExt;
410 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
411 /// let tick = process.tick();
412 /// let optional = tick.optional_first_tick(q!(
413 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
414 /// ));
415 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
416 /// # }, |mut stream| async move {
417 /// // 1, 2, 3, but in no particular order
418 /// # let mut results = Vec::new();
419 /// # for _ in 0..3 {
420 /// # results.push(stream.next().await.unwrap());
421 /// # }
422 /// # results.sort();
423 /// # assert_eq!(results, vec![1, 2, 3]);
424 /// # }));
425 /// # }
426 /// ```
427 pub fn flat_map_unordered<U, I, F>(
428 self,
429 f: impl IntoQuotedMut<'a, F, L>,
430 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
431 where
432 I: IntoIterator<Item = U>,
433 F: Fn(T) -> I + 'a,
434 {
435 let f = f.splice_fn1_ctx(&self.location).into();
436 Stream::new(
437 self.location.clone(),
438 HydroNode::FlatMap {
439 f,
440 input: Box::new(self.ir_node.into_inner()),
441 metadata: self
442 .location
443 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
444 },
445 )
446 }
447
448 /// Flattens the optional value into a stream, preserving the order of elements.
449 ///
450 /// If the optional is empty, the output stream is also empty. If the optional contains
451 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
452 /// in the output stream in deterministic order.
453 ///
454 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
455 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
456 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
457 ///
458 /// # Example
459 /// ```rust
460 /// # #[cfg(feature = "deploy")] {
461 /// # use hydro_lang::prelude::*;
462 /// # use futures::StreamExt;
463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464 /// let tick = process.tick();
465 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
466 /// optional.flatten_ordered().all_ticks()
467 /// # }, |mut stream| async move {
468 /// // 1, 2, 3
469 /// # for w in vec![1, 2, 3] {
470 /// # assert_eq!(stream.next().await.unwrap(), w);
471 /// # }
472 /// # }));
473 /// # }
474 /// ```
475 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
476 where
477 T: IntoIterator<Item = U>,
478 {
479 self.flat_map_ordered(q!(|v| v))
480 }
481
482 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
483 /// for the element type `T` to produce items in any order.
484 ///
485 /// If the optional is empty, the output stream is also empty. If the optional contains
486 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
487 /// in the output stream in non-deterministic order.
488 ///
489 /// # Example
490 /// ```rust
491 /// # #[cfg(feature = "deploy")] {
492 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
495 /// let tick = process.tick();
496 /// let optional = tick.optional_first_tick(q!(
497 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
498 /// ));
499 /// optional.flatten_unordered().all_ticks()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, but in no particular order
502 /// # let mut results = Vec::new();
503 /// # for _ in 0..3 {
504 /// # results.push(stream.next().await.unwrap());
505 /// # }
506 /// # results.sort();
507 /// # assert_eq!(results, vec![1, 2, 3]);
508 /// # }));
509 /// # }
510 /// ```
511 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
512 where
513 T: IntoIterator<Item = U>,
514 {
515 self.flat_map_unordered(q!(|v| v))
516 }
517
518 /// Creates an optional containing only the value if it satisfies a predicate `f`.
519 ///
520 /// If the optional is empty, the output optional is also empty. If the optional contains
521 /// a value and the predicate returns `true`, the output optional contains the same value.
522 /// If the predicate returns `false`, the output optional is empty.
523 ///
524 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
525 /// not modify or take ownership of the value. If you need to modify the value while filtering
526 /// use [`Optional::filter_map`] instead.
527 ///
528 /// # Example
529 /// ```rust
530 /// # #[cfg(feature = "deploy")] {
531 /// # use hydro_lang::prelude::*;
532 /// # use futures::StreamExt;
533 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
534 /// let tick = process.tick();
535 /// let optional = tick.optional_first_tick(q!(5));
536 /// optional.filter(q!(|&x| x > 3)).all_ticks()
537 /// # }, |mut stream| async move {
538 /// // 5
539 /// # assert_eq!(stream.next().await.unwrap(), 5);
540 /// # }));
541 /// # }
542 /// ```
543 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
544 where
545 F: Fn(&T) -> bool + 'a,
546 {
547 let f = f.splice_fn1_borrow_ctx(&self.location).into();
548 Optional::new(
549 self.location.clone(),
550 HydroNode::Filter {
551 f,
552 input: Box::new(self.ir_node.into_inner()),
553 metadata: self.location.new_node_metadata(Self::collection_kind()),
554 },
555 )
556 }
557
558 /// An operator that both filters and maps. It yields only the value if the supplied
559 /// closure `f` returns `Some(value)`.
560 ///
561 /// If the optional is empty, the output optional is also empty. If the optional contains
562 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
563 /// If the closure returns `None`, the output optional is empty.
564 ///
565 /// # Example
566 /// ```rust
567 /// # #[cfg(feature = "deploy")] {
568 /// # use hydro_lang::prelude::*;
569 /// # use futures::StreamExt;
570 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
571 /// let tick = process.tick();
572 /// let optional = tick.optional_first_tick(q!("42"));
573 /// optional
574 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
575 /// .all_ticks()
576 /// # }, |mut stream| async move {
577 /// // 42
578 /// # assert_eq!(stream.next().await.unwrap(), 42);
579 /// # }));
580 /// # }
581 /// ```
582 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
583 where
584 F: Fn(T) -> Option<U> + 'a,
585 {
586 let f = f.splice_fn1_ctx(&self.location).into();
587 Optional::new(
588 self.location.clone(),
589 HydroNode::FilterMap {
590 f,
591 input: Box::new(self.ir_node.into_inner()),
592 metadata: self
593 .location
594 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
595 },
596 )
597 }
598
599 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
600 ///
601 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
602 /// non-null. This is useful for combining several pieces of state together.
603 ///
604 /// # Example
605 /// ```rust
606 /// # #[cfg(feature = "deploy")] {
607 /// # use hydro_lang::prelude::*;
608 /// # use futures::StreamExt;
609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610 /// let tick = process.tick();
611 /// let numbers = process
612 /// .source_iter(q!(vec![123, 456, 789]))
613 /// .batch(&tick, nondet!(/** test */));
614 /// let min = numbers.clone().min(); // Optional
615 /// let max = numbers.max(); // Optional
616 /// min.zip(max).all_ticks()
617 /// # }, |mut stream| async move {
618 /// // [(123, 789)]
619 /// # for w in vec![(123, 789)] {
620 /// # assert_eq!(stream.next().await.unwrap(), w);
621 /// # }
622 /// # }));
623 /// # }
624 /// ```
625 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
626 where
627 O: Clone,
628 {
629 let other: Optional<O, L, B> = other.into();
630 check_matching_location(&self.location, &other.location);
631
632 if L::is_top_level()
633 && let Some(tick) = self.location.try_tick()
634 {
635 let out = zip_inside_tick(
636 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
637 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638 )
639 .latest();
640
641 Optional::new(out.location, out.ir_node.into_inner())
642 } else {
643 zip_inside_tick(self, other)
644 }
645 }
646
647 /// Passes through `self` when it has a value, otherwise passes through `other`.
648 ///
649 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
650 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
651 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
652 ///
653 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
654 /// of the inputs change (including to/from null states).
655 ///
656 /// # Example
657 /// ```rust
658 /// # #[cfg(feature = "deploy")] {
659 /// # use hydro_lang::prelude::*;
660 /// # use futures::StreamExt;
661 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
662 /// let tick = process.tick();
663 /// // ticks are lazy by default, forces the second tick to run
664 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
665 ///
666 /// let some_first_tick = tick.optional_first_tick(q!(123));
667 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
668 /// some_first_tick.or(some_second_tick).all_ticks()
669 /// # }, |mut stream| async move {
670 /// // [123 /* first tick */, 456 /* second tick */]
671 /// # for w in vec![123, 456] {
672 /// # assert_eq!(stream.next().await.unwrap(), w);
673 /// # }
674 /// # }));
675 /// # }
676 /// ```
677 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
678 check_matching_location(&self.location, &other.location);
679
680 if L::is_top_level()
681 && !B::BOUNDED // only if unbounded we need to use a tick
682 && let Some(tick) = self.location.try_tick()
683 {
684 let out = or_inside_tick(
685 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
686 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
687 )
688 .latest();
689
690 Optional::new(out.location, out.ir_node.into_inner())
691 } else {
692 Optional::new(
693 self.location.clone(),
694 HydroNode::ChainFirst {
695 first: Box::new(self.ir_node.into_inner()),
696 second: Box::new(other.ir_node.into_inner()),
697 metadata: self.location.new_node_metadata(Self::collection_kind()),
698 },
699 )
700 }
701 }
702
703 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
704 ///
705 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
706 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
707 ///
708 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
709 /// of the inputs change (including to/from null states).
710 ///
711 /// # Example
712 /// ```rust
713 /// # #[cfg(feature = "deploy")] {
714 /// # use hydro_lang::prelude::*;
715 /// # use futures::StreamExt;
716 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
717 /// let tick = process.tick();
718 /// // ticks are lazy by default, forces the later ticks to run
719 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
720 ///
721 /// let some_first_tick = tick.optional_first_tick(q!(123));
722 /// some_first_tick
723 /// .unwrap_or(tick.singleton(q!(456)))
724 /// .all_ticks()
725 /// # }, |mut stream| async move {
726 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
727 /// # for w in vec![123, 456, 456, 456] {
728 /// # assert_eq!(stream.next().await.unwrap(), w);
729 /// # }
730 /// # }));
731 /// # }
732 /// ```
733 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
734 let res_option = self.or(other.into());
735 Singleton::new(
736 res_option.location.clone(),
737 HydroNode::Cast {
738 inner: Box::new(res_option.ir_node.into_inner()),
739 metadata: res_option
740 .location
741 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
742 },
743 )
744 }
745
746 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
747 ///
748 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
749 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
750 /// so that Hydro can skip any computation on null values.
751 ///
752 /// # Example
753 /// ```rust
754 /// # #[cfg(feature = "deploy")] {
755 /// # use hydro_lang::prelude::*;
756 /// # use futures::StreamExt;
757 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
758 /// let tick = process.tick();
759 /// // ticks are lazy by default, forces the later ticks to run
760 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
761 ///
762 /// let some_first_tick = tick.optional_first_tick(q!(123));
763 /// some_first_tick.into_singleton().all_ticks()
764 /// # }, |mut stream| async move {
765 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
766 /// # for w in vec![Some(123), None, None, None] {
767 /// # assert_eq!(stream.next().await.unwrap(), w);
768 /// # }
769 /// # }));
770 /// # }
771 /// ```
772 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
773 where
774 T: Clone,
775 {
776 let none: syn::Expr = parse_quote!(::std::option::Option::None);
777
778 let none_singleton = Singleton::new(
779 self.location.clone(),
780 HydroNode::SingletonSource {
781 value: none.into(),
782 metadata: self
783 .location
784 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
785 },
786 );
787
788 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
789 }
790
791 /// An operator which allows you to "name" a `HydroNode`.
792 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
793 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
794 {
795 let mut node = self.ir_node.borrow_mut();
796 let metadata = node.metadata_mut();
797 metadata.tag = Some(name.to_string());
798 }
799 self
800 }
801}
802
803impl<'a, T, L> Optional<T, L, Bounded>
804where
805 L: Location<'a>,
806{
807 /// Clones this bounded optional into a tick, returning a optional that has the
808 /// same value as the outer optional. Because the outer optional is bounded, this
809 /// is deterministic because there is only a single immutable version.
810 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
811 where
812 T: Clone,
813 {
814 // TODO(shadaj): avoid printing simulator logs for this snapshot
815 self.snapshot(
816 tick,
817 nondet!(/** bounded top-level optional so deterministic */),
818 )
819 }
820
821 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
822 /// non-null. Otherwise, the stream is empty.
823 ///
824 /// # Example
825 /// ```rust
826 /// # #[cfg(feature = "deploy")] {
827 /// # use hydro_lang::prelude::*;
828 /// # use futures::StreamExt;
829 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
830 /// # let tick = process.tick();
831 /// # // ticks are lazy by default, forces the second tick to run
832 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
833 /// # let batch_first_tick = process
834 /// # .source_iter(q!(vec![]))
835 /// # .batch(&tick, nondet!(/** test */));
836 /// # let batch_second_tick = process
837 /// # .source_iter(q!(vec![123, 456]))
838 /// # .batch(&tick, nondet!(/** test */))
839 /// # .defer_tick(); // appears on the second tick
840 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
841 /// input_batch // first tick: [], second tick: [123, 456]
842 /// .clone()
843 /// .max()
844 /// .into_stream()
845 /// .chain(input_batch)
846 /// .all_ticks()
847 /// # }, |mut stream| async move {
848 /// // [456, 123, 456]
849 /// # for w in vec![456, 123, 456] {
850 /// # assert_eq!(stream.next().await.unwrap(), w);
851 /// # }
852 /// # }));
853 /// # }
854 /// ```
855 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
856 Stream::new(
857 self.location.clone(),
858 HydroNode::Cast {
859 inner: Box::new(self.ir_node.into_inner()),
860 metadata: self.location.new_node_metadata(Stream::<
861 T,
862 Tick<L>,
863 Bounded,
864 TotalOrder,
865 ExactlyOnce,
866 >::collection_kind()),
867 },
868 )
869 }
870
871 /// Filters this optional, passing through the optional value if it is non-null **and** the
872 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
873 ///
874 /// Useful for conditionally processing, such as only emitting an optional's value outside
875 /// a tick if some other condition is satisfied.
876 ///
877 /// # Example
878 /// ```rust
879 /// # #[cfg(feature = "deploy")] {
880 /// # use hydro_lang::prelude::*;
881 /// # use futures::StreamExt;
882 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
883 /// let tick = process.tick();
884 /// // ticks are lazy by default, forces the second tick to run
885 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
886 ///
887 /// let batch_first_tick = process
888 /// .source_iter(q!(vec![]))
889 /// .batch(&tick, nondet!(/** test */));
890 /// let batch_second_tick = process
891 /// .source_iter(q!(vec![456]))
892 /// .batch(&tick, nondet!(/** test */))
893 /// .defer_tick(); // appears on the second tick
894 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
895 /// batch_first_tick.chain(batch_second_tick).first()
896 /// .filter_if_some(some_on_first_tick)
897 /// .unwrap_or(tick.singleton(q!(789)))
898 /// .all_ticks()
899 /// # }, |mut stream| async move {
900 /// // [789, 789]
901 /// # for w in vec![789, 789] {
902 /// # assert_eq!(stream.next().await.unwrap(), w);
903 /// # }
904 /// # }));
905 /// # }
906 /// ```
907 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
908 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
909 }
910
911 /// Filters this optional, passing through the optional value if it is non-null **and** the
912 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
913 ///
914 /// Useful for conditionally processing, such as only emitting an optional's value outside
915 /// a tick if some other condition is satisfied.
916 ///
917 /// # Example
918 /// ```rust
919 /// # #[cfg(feature = "deploy")] {
920 /// # use hydro_lang::prelude::*;
921 /// # use futures::StreamExt;
922 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
923 /// let tick = process.tick();
924 /// // ticks are lazy by default, forces the second tick to run
925 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
926 ///
927 /// let batch_first_tick = process
928 /// .source_iter(q!(vec![]))
929 /// .batch(&tick, nondet!(/** test */));
930 /// let batch_second_tick = process
931 /// .source_iter(q!(vec![456]))
932 /// .batch(&tick, nondet!(/** test */))
933 /// .defer_tick(); // appears on the second tick
934 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
935 /// batch_first_tick.chain(batch_second_tick).first()
936 /// .filter_if_none(some_on_first_tick)
937 /// .unwrap_or(tick.singleton(q!(789)))
938 /// .all_ticks()
939 /// # }, |mut stream| async move {
940 /// // [789, 789]
941 /// # for w in vec![789, 456] {
942 /// # assert_eq!(stream.next().await.unwrap(), w);
943 /// # }
944 /// # }));
945 /// # }
946 /// ```
947 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
948 self.filter_if_some(
949 other
950 .map(q!(|_| ()))
951 .into_singleton()
952 .filter(q!(|o| o.is_none())),
953 )
954 }
955
956 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
957 ///
958 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
959 /// having a value, such as only releasing a piece of state if the node is the leader.
960 ///
961 /// # Example
962 /// ```rust
963 /// # #[cfg(feature = "deploy")] {
964 /// # use hydro_lang::prelude::*;
965 /// # use futures::StreamExt;
966 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
967 /// let tick = process.tick();
968 /// // ticks are lazy by default, forces the second tick to run
969 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
970 ///
971 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
972 /// some_on_first_tick
973 /// .if_some_then(tick.singleton(q!(456)))
974 /// .unwrap_or(tick.singleton(q!(123)))
975 /// # .all_ticks()
976 /// # }, |mut stream| async move {
977 /// // 456 (first tick) ~> 123 (second tick onwards)
978 /// # for w in vec![456, 123, 123] {
979 /// # assert_eq!(stream.next().await.unwrap(), w);
980 /// # }
981 /// # }));
982 /// # }
983 /// ```
984 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
985 value.filter_if_some(self)
986 }
987}
988
989impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
990where
991 L: Location<'a> + NoTick,
992{
993 /// Returns an optional value corresponding to the latest snapshot of the optional
994 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
995 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
996 /// all snapshots of this optional into the atomic-associated tick will observe the
997 /// same value each tick.
998 ///
999 /// # Non-Determinism
1000 /// Because this picks a snapshot of a optional whose value is continuously changing,
1001 /// the output optional has a non-deterministic value since the snapshot can be at an
1002 /// arbitrary point in time.
1003 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1004 Optional::new(
1005 self.location.clone().tick,
1006 HydroNode::Batch {
1007 inner: Box::new(self.ir_node.into_inner()),
1008 metadata: self
1009 .location
1010 .tick
1011 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1012 },
1013 )
1014 }
1015
1016 /// Returns this optional back into a top-level, asynchronous execution context where updates
1017 /// to the value will be asynchronously propagated.
1018 pub fn end_atomic(self) -> Optional<T, L, B> {
1019 Optional::new(
1020 self.location.tick.l.clone(),
1021 HydroNode::EndAtomic {
1022 inner: Box::new(self.ir_node.into_inner()),
1023 metadata: self
1024 .location
1025 .tick
1026 .l
1027 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1028 },
1029 )
1030 }
1031}
1032
1033impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1034where
1035 L: Location<'a>,
1036{
1037 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1038 /// will observe the same version of the value and will be executed synchronously before any
1039 /// outputs are yielded (in [`Optional::end_atomic`]).
1040 ///
1041 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1042 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1043 /// a different version).
1044 ///
1045 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1046 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1047 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1048 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1049 let out_location = Atomic { tick: tick.clone() };
1050 Optional::new(
1051 out_location.clone(),
1052 HydroNode::BeginAtomic {
1053 inner: Box::new(self.ir_node.into_inner()),
1054 metadata: out_location
1055 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1056 },
1057 )
1058 }
1059
1060 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1061 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1062 /// relevant data that contributed to the snapshot at tick `t`.
1063 ///
1064 /// # Non-Determinism
1065 /// Because this picks a snapshot of a optional whose value is continuously changing,
1066 /// the output optional has a non-deterministic value since the snapshot can be at an
1067 /// arbitrary point in time.
1068 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1069 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1070 Optional::new(
1071 tick.clone(),
1072 HydroNode::Batch {
1073 inner: Box::new(self.ir_node.into_inner()),
1074 metadata: tick
1075 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1076 },
1077 )
1078 }
1079
1080 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1081 /// with order corresponding to increasing prefixes of data contributing to the optional.
1082 ///
1083 /// # Non-Determinism
1084 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1085 /// to non-deterministic batching and arrival of inputs, the output stream is
1086 /// non-deterministic.
1087 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1088 where
1089 L: NoTick,
1090 {
1091 let tick = self.location.tick();
1092 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1093 }
1094
1095 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1096 /// value taken at various points in time. Because the input optional may be
1097 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1098 /// represent the value of the optional given some prefix of the streams leading up to
1099 /// it.
1100 ///
1101 /// # Non-Determinism
1102 /// The output stream is non-deterministic in which elements are sampled, since this
1103 /// is controlled by a clock.
1104 pub fn sample_every(
1105 self,
1106 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1107 nondet: NonDet,
1108 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1109 where
1110 L: NoTick + NoAtomic,
1111 {
1112 let samples = self.location.source_interval(interval, nondet);
1113 let tick = self.location.tick();
1114
1115 self.snapshot(&tick, nondet)
1116 .filter_if_some(samples.batch(&tick, nondet).first())
1117 .all_ticks()
1118 .weakest_retries()
1119 }
1120}
1121
1122impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1123where
1124 L: Location<'a>,
1125{
1126 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1127 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1128 /// null values).
1129 ///
1130 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1131 /// producing one element in the output for each (non-null) tick. This is useful for batched
1132 /// computations, where the results from each tick must be combined together.
1133 ///
1134 /// # Example
1135 /// ```rust
1136 /// # #[cfg(feature = "deploy")] {
1137 /// # use hydro_lang::prelude::*;
1138 /// # use futures::StreamExt;
1139 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1140 /// # let tick = process.tick();
1141 /// # // ticks are lazy by default, forces the second tick to run
1142 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1143 /// # let batch_first_tick = process
1144 /// # .source_iter(q!(vec![]))
1145 /// # .batch(&tick, nondet!(/** test */));
1146 /// # let batch_second_tick = process
1147 /// # .source_iter(q!(vec![1, 2, 3]))
1148 /// # .batch(&tick, nondet!(/** test */))
1149 /// # .defer_tick(); // appears on the second tick
1150 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1151 /// input_batch // first tick: [], second tick: [1, 2, 3]
1152 /// .max()
1153 /// .all_ticks()
1154 /// # }, |mut stream| async move {
1155 /// // [3]
1156 /// # for w in vec![3] {
1157 /// # assert_eq!(stream.next().await.unwrap(), w);
1158 /// # }
1159 /// # }));
1160 /// # }
1161 /// ```
1162 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1163 self.into_stream().all_ticks()
1164 }
1165
1166 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1167 /// which will stream the value computed in _each_ tick as a separate stream element.
1168 ///
1169 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1170 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1171 /// optional's [`Tick`] context.
1172 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1173 self.into_stream().all_ticks_atomic()
1174 }
1175
1176 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1177 /// be asynchronously updated with the latest value of the optional inside the tick, including
1178 /// whether the optional is null or not.
1179 ///
1180 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1181 /// tick that tracks the inner value. This is useful for getting the value as of the
1182 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1183 ///
1184 /// # Example
1185 /// ```rust
1186 /// # #[cfg(feature = "deploy")] {
1187 /// # use hydro_lang::prelude::*;
1188 /// # use futures::StreamExt;
1189 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1190 /// # let tick = process.tick();
1191 /// # // ticks are lazy by default, forces the second tick to run
1192 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1193 /// # let batch_first_tick = process
1194 /// # .source_iter(q!(vec![]))
1195 /// # .batch(&tick, nondet!(/** test */));
1196 /// # let batch_second_tick = process
1197 /// # .source_iter(q!(vec![1, 2, 3]))
1198 /// # .batch(&tick, nondet!(/** test */))
1199 /// # .defer_tick(); // appears on the second tick
1200 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1201 /// input_batch // first tick: [], second tick: [1, 2, 3]
1202 /// .max()
1203 /// .latest()
1204 /// # .into_singleton()
1205 /// # .sample_eager(nondet!(/** test */))
1206 /// # }, |mut stream| async move {
1207 /// // asynchronously changes from None ~> 3
1208 /// # for w in vec![None, Some(3)] {
1209 /// # assert_eq!(stream.next().await.unwrap(), w);
1210 /// # }
1211 /// # }));
1212 /// # }
1213 /// ```
1214 pub fn latest(self) -> Optional<T, L, Unbounded> {
1215 Optional::new(
1216 self.location.outer().clone(),
1217 HydroNode::YieldConcat {
1218 inner: Box::new(self.ir_node.into_inner()),
1219 metadata: self
1220 .location
1221 .outer()
1222 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1223 },
1224 )
1225 }
1226
1227 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1228 /// be updated with the latest value of the optional inside the tick.
1229 ///
1230 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1231 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1232 /// optional's [`Tick`] context.
1233 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1234 let out_location = Atomic {
1235 tick: self.location.clone(),
1236 };
1237
1238 Optional::new(
1239 out_location.clone(),
1240 HydroNode::YieldConcat {
1241 inner: Box::new(self.ir_node.into_inner()),
1242 metadata: out_location
1243 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1244 },
1245 )
1246 }
1247
1248 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1249 /// always has the state of `self` at tick `T - 1`.
1250 ///
1251 /// At tick `0`, the output optional is null, since there is no previous tick.
1252 ///
1253 /// This operator enables stateful iterative processing with ticks, by sending data from one
1254 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1255 ///
1256 /// # Example
1257 /// ```rust
1258 /// # #[cfg(feature = "deploy")] {
1259 /// # use hydro_lang::prelude::*;
1260 /// # use futures::StreamExt;
1261 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262 /// let tick = process.tick();
1263 /// // ticks are lazy by default, forces the second tick to run
1264 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1265 ///
1266 /// let batch_first_tick = process
1267 /// .source_iter(q!(vec![1, 2]))
1268 /// .batch(&tick, nondet!(/** test */));
1269 /// let batch_second_tick = process
1270 /// .source_iter(q!(vec![3, 4]))
1271 /// .batch(&tick, nondet!(/** test */))
1272 /// .defer_tick(); // appears on the second tick
1273 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1274 /// .reduce(q!(|state, v| *state += v));
1275 ///
1276 /// current_tick_sum.clone().into_singleton().zip(
1277 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1278 /// ).all_ticks()
1279 /// # }, |mut stream| async move {
1280 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1281 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1282 /// # assert_eq!(stream.next().await.unwrap(), w);
1283 /// # }
1284 /// # }));
1285 /// # }
1286 /// ```
1287 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1288 Optional::new(
1289 self.location.clone(),
1290 HydroNode::DeferTick {
1291 input: Box::new(self.ir_node.into_inner()),
1292 metadata: self.location.new_node_metadata(Self::collection_kind()),
1293 },
1294 )
1295 }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 #[cfg(feature = "deploy")]
1301 use futures::StreamExt;
1302 #[cfg(feature = "deploy")]
1303 use hydro_deploy::Deployment;
1304 use stageleft::q;
1305
1306 #[cfg(feature = "deploy")]
1307 use super::Optional;
1308 use crate::compile::builder::FlowBuilder;
1309 use crate::location::Location;
1310 #[cfg(feature = "deploy")]
1311 use crate::nondet::nondet;
1312
1313 #[cfg(feature = "deploy")]
1314 #[tokio::test]
1315 async fn optional_or_cardinality() {
1316 let mut deployment = Deployment::new();
1317
1318 let flow = FlowBuilder::new();
1319 let node = flow.process::<()>();
1320 let external = flow.external::<()>();
1321
1322 let node_tick = node.tick();
1323 let tick_singleton = node_tick.singleton(q!(123));
1324 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1325 let counts = tick_optional_inhabited
1326 .clone()
1327 .or(tick_optional_inhabited)
1328 .into_stream()
1329 .count()
1330 .all_ticks()
1331 .send_bincode_external(&external);
1332
1333 let nodes = flow
1334 .with_process(&node, deployment.Localhost())
1335 .with_external(&external, deployment.Localhost())
1336 .deploy(&mut deployment);
1337
1338 deployment.deploy().await.unwrap();
1339
1340 let mut external_out = nodes.connect(counts).await;
1341
1342 deployment.start().await.unwrap();
1343
1344 assert_eq!(external_out.next().await.unwrap(), 1);
1345 }
1346
1347 #[cfg(feature = "deploy")]
1348 #[tokio::test]
1349 async fn into_singleton_top_level_none_cardinality() {
1350 let mut deployment = Deployment::new();
1351
1352 let flow = FlowBuilder::new();
1353 let node = flow.process::<()>();
1354 let external = flow.external::<()>();
1355
1356 let node_tick = node.tick();
1357 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1358 let into_singleton = top_level_none.into_singleton();
1359
1360 let tick_driver = node.spin();
1361
1362 let counts = into_singleton
1363 .snapshot(&node_tick, nondet!(/** test */))
1364 .into_stream()
1365 .count()
1366 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1367 .map(q!(|(c, _)| c))
1368 .all_ticks()
1369 .send_bincode_external(&external);
1370
1371 let nodes = flow
1372 .with_process(&node, deployment.Localhost())
1373 .with_external(&external, deployment.Localhost())
1374 .deploy(&mut deployment);
1375
1376 deployment.deploy().await.unwrap();
1377
1378 let mut external_out = nodes.connect(counts).await;
1379
1380 deployment.start().await.unwrap();
1381
1382 assert_eq!(external_out.next().await.unwrap(), 1);
1383 assert_eq!(external_out.next().await.unwrap(), 1);
1384 assert_eq!(external_out.next().await.unwrap(), 1);
1385 }
1386
1387 #[cfg(feature = "deploy")]
1388 #[tokio::test]
1389 async fn into_singleton_unbounded_top_level_none_cardinality() {
1390 let mut deployment = Deployment::new();
1391
1392 let flow = FlowBuilder::new();
1393 let node = flow.process::<()>();
1394 let external = flow.external::<()>();
1395
1396 let node_tick = node.tick();
1397 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1398 let into_singleton = top_level_none.into_singleton();
1399
1400 let tick_driver = node.spin();
1401
1402 let counts = into_singleton
1403 .snapshot(&node_tick, nondet!(/** test */))
1404 .into_stream()
1405 .count()
1406 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1407 .map(q!(|(c, _)| c))
1408 .all_ticks()
1409 .send_bincode_external(&external);
1410
1411 let nodes = flow
1412 .with_process(&node, deployment.Localhost())
1413 .with_external(&external, deployment.Localhost())
1414 .deploy(&mut deployment);
1415
1416 deployment.deploy().await.unwrap();
1417
1418 let mut external_out = nodes.connect(counts).await;
1419
1420 deployment.start().await.unwrap();
1421
1422 assert_eq!(external_out.next().await.unwrap(), 1);
1423 assert_eq!(external_out.next().await.unwrap(), 1);
1424 assert_eq!(external_out.next().await.unwrap(), 1);
1425 }
1426
1427 #[cfg(feature = "sim")]
1428 #[test]
1429 fn top_level_optional_some_into_stream_no_replay() {
1430 let flow = FlowBuilder::new();
1431 let node = flow.process::<()>();
1432
1433 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1434 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1435 let filtered_some = folded.filter(q!(|_| true));
1436
1437 let out_recv = filtered_some.into_stream().sim_output();
1438
1439 flow.sim().exhaustive(async || {
1440 out_recv.assert_yields_only([10]).await;
1441 });
1442 }
1443
1444 #[cfg(feature = "sim")]
1445 #[test]
1446 fn top_level_optional_none_into_stream_no_replay() {
1447 let flow = FlowBuilder::new();
1448 let node = flow.process::<()>();
1449
1450 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1451 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1452 let filtered_none = folded.filter(q!(|_| false));
1453
1454 let out_recv = filtered_none.into_stream().sim_output();
1455
1456 flow.sim().exhaustive(async || {
1457 out_recv.assert_yields_only([] as [i32; 0]).await;
1458 });
1459 }
1460}