hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46 L: Location<'a>,
47{
48 fn defer_tick(self) -> Self {
49 Optional::defer_tick(self)
50 }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60 Optional::new(
61 location.clone(),
62 HydroNode::CycleSource {
63 ident,
64 metadata: location.new_node_metadata(Self::collection_kind()),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 Location::id(&self.location),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .push_root(HydroRoot::CycleSink {
84 ident,
85 input: Box::new(self.ir_node.into_inner()),
86 op_metadata: HydroIrOpMetadata::new(),
87 });
88 }
89}
90
91impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
92where
93 L: Location<'a>,
94{
95 type Location = Tick<L>;
96
97 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
98 Optional::new(
99 location.clone(),
100 HydroNode::CycleSource {
101 ident,
102 metadata: location.new_node_metadata(Self::collection_kind()),
103 },
104 )
105 }
106}
107
108impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
109where
110 L: Location<'a>,
111{
112 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
113 assert_eq!(
114 Location::id(&self.location),
115 expected_location,
116 "locations do not match"
117 );
118 self.location
119 .flow_state()
120 .borrow_mut()
121 .push_root(HydroRoot::CycleSink {
122 ident,
123 input: Box::new(self.ir_node.into_inner()),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127}
128
129impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
130where
131 L: Location<'a> + NoTick,
132{
133 type Location = L;
134
135 fn create_source(ident: syn::Ident, location: L) -> Self {
136 Optional::new(
137 location.clone(),
138 HydroNode::CycleSource {
139 ident,
140 metadata: location.new_node_metadata(Self::collection_kind()),
141 },
142 )
143 }
144}
145
146impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
147where
148 L: Location<'a> + NoTick,
149{
150 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
151 assert_eq!(
152 Location::id(&self.location),
153 expected_location,
154 "locations do not match"
155 );
156 self.location
157 .flow_state()
158 .borrow_mut()
159 .push_root(HydroRoot::CycleSink {
160 ident,
161 input: Box::new(self.ir_node.into_inner()),
162 op_metadata: HydroIrOpMetadata::new(),
163 });
164 }
165}
166
167impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
168where
169 L: Location<'a>,
170{
171 fn from(singleton: Optional<T, L, Bounded>) -> Self {
172 Optional::new(singleton.location, singleton.ir_node.into_inner())
173 }
174}
175
176impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
177where
178 L: Location<'a>,
179{
180 fn from(singleton: Singleton<T, L, B>) -> Self {
181 Optional::new(
182 singleton.location.clone(),
183 HydroNode::Cast {
184 inner: Box::new(singleton.ir_node.into_inner()),
185 metadata: singleton
186 .location
187 .new_node_metadata(Self::collection_kind()),
188 },
189 )
190 }
191}
192
193#[cfg(stageleft_runtime)]
194fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
195 me: Optional<T, L, B>,
196 other: Optional<O, L, B>,
197) -> Optional<(T, O), L, B> {
198 check_matching_location(&me.location, &other.location);
199
200 Optional::new(
201 me.location.clone(),
202 HydroNode::CrossSingleton {
203 left: Box::new(me.ir_node.into_inner()),
204 right: Box::new(other.ir_node.into_inner()),
205 metadata: me
206 .location
207 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
208 },
209 )
210}
211
212#[cfg(stageleft_runtime)]
213fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
214 me: Optional<T, L, B>,
215 other: Optional<T, L, B>,
216) -> Optional<T, L, B> {
217 check_matching_location(&me.location, &other.location);
218
219 Optional::new(
220 me.location.clone(),
221 HydroNode::ChainFirst {
222 first: Box::new(me.ir_node.into_inner()),
223 second: Box::new(other.ir_node.into_inner()),
224 metadata: me
225 .location
226 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
227 },
228 )
229}
230
231impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
232where
233 T: Clone,
234 L: Location<'a>,
235{
236 fn clone(&self) -> Self {
237 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
238 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
239 *self.ir_node.borrow_mut() = HydroNode::Tee {
240 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
241 metadata: self.location.new_node_metadata(Self::collection_kind()),
242 };
243 }
244
245 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
246 Optional {
247 location: self.location.clone(),
248 ir_node: HydroNode::Tee {
249 inner: TeeNode(inner.0.clone()),
250 metadata: metadata.clone(),
251 }
252 .into(),
253 _phantom: PhantomData,
254 }
255 } else {
256 unreachable!()
257 }
258 }
259}
260
261impl<'a, T, L, B: Boundedness> Optional<T, L, B>
262where
263 L: Location<'a>,
264{
265 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
266 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
267 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
268 Optional {
269 location,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 pub(crate) fn collection_kind() -> CollectionKind {
276 CollectionKind::Optional {
277 bound: B::BOUND_KIND,
278 element_type: stageleft::quote_type::<T>().into(),
279 }
280 }
281
282 /// Returns the [`Location`] where this optional is being materialized.
283 pub fn location(&self) -> &L {
284 &self.location
285 }
286
287 /// Transforms the optional value by applying a function `f` to it,
288 /// continuously as the input is updated.
289 ///
290 /// Whenever the optional is empty, the output optional is also empty.
291 ///
292 /// # Example
293 /// ```rust
294 /// # #[cfg(feature = "deploy")] {
295 /// # use hydro_lang::prelude::*;
296 /// # use futures::StreamExt;
297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298 /// let tick = process.tick();
299 /// let optional = tick.optional_first_tick(q!(1));
300 /// optional.map(q!(|v| v + 1)).all_ticks()
301 /// # }, |mut stream| async move {
302 /// // 2
303 /// # assert_eq!(stream.next().await.unwrap(), 2);
304 /// # }));
305 /// # }
306 /// ```
307 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308 where
309 F: Fn(T) -> U + 'a,
310 {
311 let f = f.splice_fn1_ctx(&self.location).into();
312 Optional::new(
313 self.location.clone(),
314 HydroNode::Map {
315 f,
316 input: Box::new(self.ir_node.into_inner()),
317 metadata: self
318 .location
319 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320 },
321 )
322 }
323
324 /// Transforms the optional value by applying a function `f` to it and then flattening
325 /// the result into a stream, preserving the order of elements.
326 ///
327 /// If the optional is empty, the output stream is also empty. If the optional contains
328 /// a value, `f` is applied to produce an iterator, and all items from that iterator
329 /// are emitted in the output stream in deterministic order.
330 ///
331 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334 ///
335 /// # Example
336 /// ```rust
337 /// # #[cfg(feature = "deploy")] {
338 /// # use hydro_lang::prelude::*;
339 /// # use futures::StreamExt;
340 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341 /// let tick = process.tick();
342 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
343 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
344 /// # }, |mut stream| async move {
345 /// // 1, 2, 3
346 /// # for w in vec![1, 2, 3] {
347 /// # assert_eq!(stream.next().await.unwrap(), w);
348 /// # }
349 /// # }));
350 /// # }
351 /// ```
352 pub fn flat_map_ordered<U, I, F>(
353 self,
354 f: impl IntoQuotedMut<'a, F, L>,
355 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
356 where
357 I: IntoIterator<Item = U>,
358 F: Fn(T) -> I + 'a,
359 {
360 let f = f.splice_fn1_ctx(&self.location).into();
361 Stream::new(
362 self.location.clone(),
363 HydroNode::FlatMap {
364 f,
365 input: Box::new(self.ir_node.into_inner()),
366 metadata: self.location.new_node_metadata(
367 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
368 ),
369 },
370 )
371 }
372
373 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
374 /// for the output type `I` to produce items in any order.
375 ///
376 /// If the optional is empty, the output stream is also empty. If the optional contains
377 /// a value, `f` is applied to produce an iterator, and all items from that iterator
378 /// are emitted in the output stream in non-deterministic order.
379 ///
380 /// # Example
381 /// ```rust
382 /// # #[cfg(feature = "deploy")] {
383 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
384 /// # use futures::StreamExt;
385 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
386 /// let tick = process.tick();
387 /// let optional = tick.optional_first_tick(q!(
388 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
389 /// ));
390 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
391 /// # }, |mut stream| async move {
392 /// // 1, 2, 3, but in no particular order
393 /// # let mut results = Vec::new();
394 /// # for _ in 0..3 {
395 /// # results.push(stream.next().await.unwrap());
396 /// # }
397 /// # results.sort();
398 /// # assert_eq!(results, vec![1, 2, 3]);
399 /// # }));
400 /// # }
401 /// ```
402 pub fn flat_map_unordered<U, I, F>(
403 self,
404 f: impl IntoQuotedMut<'a, F, L>,
405 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
406 where
407 I: IntoIterator<Item = U>,
408 F: Fn(T) -> I + 'a,
409 {
410 let f = f.splice_fn1_ctx(&self.location).into();
411 Stream::new(
412 self.location.clone(),
413 HydroNode::FlatMap {
414 f,
415 input: Box::new(self.ir_node.into_inner()),
416 metadata: self
417 .location
418 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
419 },
420 )
421 }
422
423 /// Flattens the optional value into a stream, preserving the order of elements.
424 ///
425 /// If the optional is empty, the output stream is also empty. If the optional contains
426 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
427 /// in the output stream in deterministic order.
428 ///
429 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
430 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
431 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
432 ///
433 /// # Example
434 /// ```rust
435 /// # #[cfg(feature = "deploy")] {
436 /// # use hydro_lang::prelude::*;
437 /// # use futures::StreamExt;
438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439 /// let tick = process.tick();
440 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
441 /// optional.flatten_ordered().all_ticks()
442 /// # }, |mut stream| async move {
443 /// // 1, 2, 3
444 /// # for w in vec![1, 2, 3] {
445 /// # assert_eq!(stream.next().await.unwrap(), w);
446 /// # }
447 /// # }));
448 /// # }
449 /// ```
450 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
451 where
452 T: IntoIterator<Item = U>,
453 {
454 self.flat_map_ordered(q!(|v| v))
455 }
456
457 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
458 /// for the element type `T` to produce items in any order.
459 ///
460 /// If the optional is empty, the output stream is also empty. If the optional contains
461 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
462 /// in the output stream in non-deterministic order.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
470 /// let tick = process.tick();
471 /// let optional = tick.optional_first_tick(q!(
472 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473 /// ));
474 /// optional.flatten_unordered().all_ticks()
475 /// # }, |mut stream| async move {
476 /// // 1, 2, 3, but in no particular order
477 /// # let mut results = Vec::new();
478 /// # for _ in 0..3 {
479 /// # results.push(stream.next().await.unwrap());
480 /// # }
481 /// # results.sort();
482 /// # assert_eq!(results, vec![1, 2, 3]);
483 /// # }));
484 /// # }
485 /// ```
486 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
487 where
488 T: IntoIterator<Item = U>,
489 {
490 self.flat_map_unordered(q!(|v| v))
491 }
492
493 /// Creates an optional containing only the value if it satisfies a predicate `f`.
494 ///
495 /// If the optional is empty, the output optional is also empty. If the optional contains
496 /// a value and the predicate returns `true`, the output optional contains the same value.
497 /// If the predicate returns `false`, the output optional is empty.
498 ///
499 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
500 /// not modify or take ownership of the value. If you need to modify the value while filtering
501 /// use [`Optional::filter_map`] instead.
502 ///
503 /// # Example
504 /// ```rust
505 /// # #[cfg(feature = "deploy")] {
506 /// # use hydro_lang::prelude::*;
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509 /// let tick = process.tick();
510 /// let optional = tick.optional_first_tick(q!(5));
511 /// optional.filter(q!(|&x| x > 3)).all_ticks()
512 /// # }, |mut stream| async move {
513 /// // 5
514 /// # assert_eq!(stream.next().await.unwrap(), 5);
515 /// # }));
516 /// # }
517 /// ```
518 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
519 where
520 F: Fn(&T) -> bool + 'a,
521 {
522 let f = f.splice_fn1_borrow_ctx(&self.location).into();
523 Optional::new(
524 self.location.clone(),
525 HydroNode::Filter {
526 f,
527 input: Box::new(self.ir_node.into_inner()),
528 metadata: self.location.new_node_metadata(Self::collection_kind()),
529 },
530 )
531 }
532
533 /// An operator that both filters and maps. It yields only the value if the supplied
534 /// closure `f` returns `Some(value)`.
535 ///
536 /// If the optional is empty, the output optional is also empty. If the optional contains
537 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
538 /// If the closure returns `None`, the output optional is empty.
539 ///
540 /// # Example
541 /// ```rust
542 /// # #[cfg(feature = "deploy")] {
543 /// # use hydro_lang::prelude::*;
544 /// # use futures::StreamExt;
545 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
546 /// let tick = process.tick();
547 /// let optional = tick.optional_first_tick(q!("42"));
548 /// optional
549 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
550 /// .all_ticks()
551 /// # }, |mut stream| async move {
552 /// // 42
553 /// # assert_eq!(stream.next().await.unwrap(), 42);
554 /// # }));
555 /// # }
556 /// ```
557 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
558 where
559 F: Fn(T) -> Option<U> + 'a,
560 {
561 let f = f.splice_fn1_ctx(&self.location).into();
562 Optional::new(
563 self.location.clone(),
564 HydroNode::FilterMap {
565 f,
566 input: Box::new(self.ir_node.into_inner()),
567 metadata: self
568 .location
569 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
570 },
571 )
572 }
573
574 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
575 ///
576 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
577 /// non-null. This is useful for combining several pieces of state together.
578 ///
579 /// # Example
580 /// ```rust
581 /// # #[cfg(feature = "deploy")] {
582 /// # use hydro_lang::prelude::*;
583 /// # use futures::StreamExt;
584 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585 /// let tick = process.tick();
586 /// let numbers = process
587 /// .source_iter(q!(vec![123, 456, 789]))
588 /// .batch(&tick, nondet!(/** test */));
589 /// let min = numbers.clone().min(); // Optional
590 /// let max = numbers.max(); // Optional
591 /// min.zip(max).all_ticks()
592 /// # }, |mut stream| async move {
593 /// // [(123, 789)]
594 /// # for w in vec![(123, 789)] {
595 /// # assert_eq!(stream.next().await.unwrap(), w);
596 /// # }
597 /// # }));
598 /// # }
599 /// ```
600 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
601 where
602 O: Clone,
603 {
604 let other: Optional<O, L, B> = other.into();
605 check_matching_location(&self.location, &other.location);
606
607 if L::is_top_level()
608 && let Some(tick) = self.location.try_tick()
609 {
610 let out = zip_inside_tick(
611 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613 )
614 .latest();
615
616 Optional::new(out.location, out.ir_node.into_inner())
617 } else {
618 zip_inside_tick(self, other)
619 }
620 }
621
622 /// Passes through `self` when it has a value, otherwise passes through `other`.
623 ///
624 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
625 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
626 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
627 ///
628 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
629 /// of the inputs change (including to/from null states).
630 ///
631 /// # Example
632 /// ```rust
633 /// # #[cfg(feature = "deploy")] {
634 /// # use hydro_lang::prelude::*;
635 /// # use futures::StreamExt;
636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637 /// let tick = process.tick();
638 /// // ticks are lazy by default, forces the second tick to run
639 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640 ///
641 /// let some_first_tick = tick.optional_first_tick(q!(123));
642 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
643 /// some_first_tick.or(some_second_tick).all_ticks()
644 /// # }, |mut stream| async move {
645 /// // [123 /* first tick */, 456 /* second tick */]
646 /// # for w in vec![123, 456] {
647 /// # assert_eq!(stream.next().await.unwrap(), w);
648 /// # }
649 /// # }));
650 /// # }
651 /// ```
652 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
653 check_matching_location(&self.location, &other.location);
654
655 if L::is_top_level()
656 && let Some(tick) = self.location.try_tick()
657 {
658 let out = or_inside_tick(
659 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
660 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
661 )
662 .latest();
663
664 Optional::new(out.location, out.ir_node.into_inner())
665 } else {
666 Optional::new(
667 self.location.clone(),
668 HydroNode::ChainFirst {
669 first: Box::new(self.ir_node.into_inner()),
670 second: Box::new(other.ir_node.into_inner()),
671 metadata: self.location.new_node_metadata(Self::collection_kind()),
672 },
673 )
674 }
675 }
676
677 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
678 ///
679 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
680 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
681 ///
682 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683 /// of the inputs change (including to/from null states).
684 ///
685 /// # Example
686 /// ```rust
687 /// # #[cfg(feature = "deploy")] {
688 /// # use hydro_lang::prelude::*;
689 /// # use futures::StreamExt;
690 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691 /// let tick = process.tick();
692 /// // ticks are lazy by default, forces the later ticks to run
693 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694 ///
695 /// let some_first_tick = tick.optional_first_tick(q!(123));
696 /// some_first_tick
697 /// .unwrap_or(tick.singleton(q!(456)))
698 /// .all_ticks()
699 /// # }, |mut stream| async move {
700 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
701 /// # for w in vec![123, 456, 456, 456] {
702 /// # assert_eq!(stream.next().await.unwrap(), w);
703 /// # }
704 /// # }));
705 /// # }
706 /// ```
707 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
708 let res_option = self.or(other.into());
709 Singleton::new(
710 res_option.location.clone(),
711 HydroNode::Cast {
712 inner: Box::new(res_option.ir_node.into_inner()),
713 metadata: res_option
714 .location
715 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
716 },
717 )
718 }
719
720 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
721 ///
722 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
723 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
724 /// so that Hydro can skip any computation on null values.
725 ///
726 /// # Example
727 /// ```rust
728 /// # #[cfg(feature = "deploy")] {
729 /// # use hydro_lang::prelude::*;
730 /// # use futures::StreamExt;
731 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
732 /// let tick = process.tick();
733 /// // ticks are lazy by default, forces the later ticks to run
734 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
735 ///
736 /// let some_first_tick = tick.optional_first_tick(q!(123));
737 /// some_first_tick.into_singleton().all_ticks()
738 /// # }, |mut stream| async move {
739 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
740 /// # for w in vec![Some(123), None, None, None] {
741 /// # assert_eq!(stream.next().await.unwrap(), w);
742 /// # }
743 /// # }));
744 /// # }
745 /// ```
746 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
747 where
748 T: Clone,
749 {
750 let none: syn::Expr = parse_quote!(::std::option::Option::None);
751
752 let none_singleton = Singleton::new(
753 self.location.clone(),
754 HydroNode::SingletonSource {
755 value: none.into(),
756 metadata: self
757 .location
758 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
759 },
760 );
761
762 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
763 }
764
765 /// An operator which allows you to "name" a `HydroNode`.
766 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
767 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
768 {
769 let mut node = self.ir_node.borrow_mut();
770 let metadata = node.metadata_mut();
771 metadata.tag = Some(name.to_string());
772 }
773 self
774 }
775}
776
777impl<'a, T, L> Optional<T, L, Bounded>
778where
779 L: Location<'a>,
780{
781 /// Filters this optional, passing through the optional value if it is non-null **and** the
782 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
783 ///
784 /// Useful for conditionally processing, such as only emitting an optional's value outside
785 /// a tick if some other condition is satisfied.
786 ///
787 /// # Example
788 /// ```rust
789 /// # #[cfg(feature = "deploy")] {
790 /// # use hydro_lang::prelude::*;
791 /// # use futures::StreamExt;
792 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793 /// let tick = process.tick();
794 /// // ticks are lazy by default, forces the second tick to run
795 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
796 ///
797 /// let batch_first_tick = process
798 /// .source_iter(q!(vec![]))
799 /// .batch(&tick, nondet!(/** test */));
800 /// let batch_second_tick = process
801 /// .source_iter(q!(vec![456]))
802 /// .batch(&tick, nondet!(/** test */))
803 /// .defer_tick(); // appears on the second tick
804 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
805 /// batch_first_tick.chain(batch_second_tick).first()
806 /// .filter_if_some(some_on_first_tick)
807 /// .unwrap_or(tick.singleton(q!(789)))
808 /// .all_ticks()
809 /// # }, |mut stream| async move {
810 /// // [789, 789]
811 /// # for w in vec![789, 789] {
812 /// # assert_eq!(stream.next().await.unwrap(), w);
813 /// # }
814 /// # }));
815 /// # }
816 /// ```
817 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
818 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
819 }
820
821 /// Filters this optional, passing through the optional value if it is non-null **and** the
822 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
823 ///
824 /// Useful for conditionally processing, such as only emitting an optional's value outside
825 /// a tick if some other condition is satisfied.
826 ///
827 /// # Example
828 /// ```rust
829 /// # #[cfg(feature = "deploy")] {
830 /// # use hydro_lang::prelude::*;
831 /// # use futures::StreamExt;
832 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
833 /// let tick = process.tick();
834 /// // ticks are lazy by default, forces the second tick to run
835 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
836 ///
837 /// let batch_first_tick = process
838 /// .source_iter(q!(vec![]))
839 /// .batch(&tick, nondet!(/** test */));
840 /// let batch_second_tick = process
841 /// .source_iter(q!(vec![456]))
842 /// .batch(&tick, nondet!(/** test */))
843 /// .defer_tick(); // appears on the second tick
844 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
845 /// batch_first_tick.chain(batch_second_tick).first()
846 /// .filter_if_none(some_on_first_tick)
847 /// .unwrap_or(tick.singleton(q!(789)))
848 /// .all_ticks()
849 /// # }, |mut stream| async move {
850 /// // [789, 789]
851 /// # for w in vec![789, 456] {
852 /// # assert_eq!(stream.next().await.unwrap(), w);
853 /// # }
854 /// # }));
855 /// # }
856 /// ```
857 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
858 self.filter_if_some(
859 other
860 .map(q!(|_| ()))
861 .into_singleton()
862 .filter(q!(|o| o.is_none())),
863 )
864 }
865
866 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
867 ///
868 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
869 /// having a value, such as only releasing a piece of state if the node is the leader.
870 ///
871 /// # Example
872 /// ```rust
873 /// # #[cfg(feature = "deploy")] {
874 /// # use hydro_lang::prelude::*;
875 /// # use futures::StreamExt;
876 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
877 /// let tick = process.tick();
878 /// // ticks are lazy by default, forces the second tick to run
879 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
880 ///
881 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
882 /// some_on_first_tick
883 /// .if_some_then(tick.singleton(q!(456)))
884 /// .unwrap_or(tick.singleton(q!(123)))
885 /// # .all_ticks()
886 /// # }, |mut stream| async move {
887 /// // 456 (first tick) ~> 123 (second tick onwards)
888 /// # for w in vec![456, 123, 123] {
889 /// # assert_eq!(stream.next().await.unwrap(), w);
890 /// # }
891 /// # }));
892 /// # }
893 /// ```
894 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
895 value.filter_if_some(self)
896 }
897}
898
899impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
900where
901 L: Location<'a> + NoTick,
902{
903 /// Returns an optional value corresponding to the latest snapshot of the optional
904 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
905 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
906 /// all snapshots of this optional into the atomic-associated tick will observe the
907 /// same value each tick.
908 ///
909 /// # Non-Determinism
910 /// Because this picks a snapshot of a optional whose value is continuously changing,
911 /// the output optional has a non-deterministic value since the snapshot can be at an
912 /// arbitrary point in time.
913 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
914 Optional::new(
915 self.location.clone().tick,
916 HydroNode::Batch {
917 inner: Box::new(self.ir_node.into_inner()),
918 metadata: self
919 .location
920 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
921 },
922 )
923 }
924
925 /// Returns this optional back into a top-level, asynchronous execution context where updates
926 /// to the value will be asynchronously propagated.
927 pub fn end_atomic(self) -> Optional<T, L, B> {
928 Optional::new(
929 self.location.tick.l.clone(),
930 HydroNode::EndAtomic {
931 inner: Box::new(self.ir_node.into_inner()),
932 metadata: self
933 .location
934 .tick
935 .l
936 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
937 },
938 )
939 }
940}
941
942impl<'a, T, L, B: Boundedness> Optional<T, L, B>
943where
944 L: Location<'a>,
945{
946 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
947 /// will observe the same version of the value and will be executed synchronously before any
948 /// outputs are yielded (in [`Optional::end_atomic`]).
949 ///
950 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
951 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
952 /// a different version).
953 ///
954 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
955 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
956 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
957 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
958 let out_location = Atomic { tick: tick.clone() };
959 Optional::new(
960 out_location.clone(),
961 HydroNode::BeginAtomic {
962 inner: Box::new(self.ir_node.into_inner()),
963 metadata: out_location
964 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
965 },
966 )
967 }
968
969 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
970 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
971 /// relevant data that contributed to the snapshot at tick `t`.
972 ///
973 /// # Non-Determinism
974 /// Because this picks a snapshot of a optional whose value is continuously changing,
975 /// the output optional has a non-deterministic value since the snapshot can be at an
976 /// arbitrary point in time.
977 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
978 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
979 Optional::new(
980 tick.clone(),
981 HydroNode::Batch {
982 inner: Box::new(self.ir_node.into_inner()),
983 metadata: tick
984 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
985 },
986 )
987 }
988
989 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
990 /// with order corresponding to increasing prefixes of data contributing to the optional.
991 ///
992 /// # Non-Determinism
993 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
994 /// to non-deterministic batching and arrival of inputs, the output stream is
995 /// non-deterministic.
996 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
997 where
998 L: NoTick,
999 {
1000 let tick = self.location.tick();
1001 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1002 }
1003
1004 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1005 /// value taken at various points in time. Because the input optional may be
1006 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1007 /// represent the value of the optional given some prefix of the streams leading up to
1008 /// it.
1009 ///
1010 /// # Non-Determinism
1011 /// The output stream is non-deterministic in which elements are sampled, since this
1012 /// is controlled by a clock.
1013 pub fn sample_every(
1014 self,
1015 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1016 nondet: NonDet,
1017 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1018 where
1019 L: NoTick + NoAtomic,
1020 {
1021 let samples = self.location.source_interval(interval, nondet);
1022 let tick = self.location.tick();
1023
1024 self.snapshot(&tick, nondet)
1025 .filter_if_some(samples.batch(&tick, nondet).first())
1026 .all_ticks()
1027 .weakest_retries()
1028 }
1029}
1030
1031impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1032where
1033 L: Location<'a>,
1034{
1035 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1036 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1037 /// null values).
1038 ///
1039 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1040 /// producing one element in the output for each (non-null) tick. This is useful for batched
1041 /// computations, where the results from each tick must be combined together.
1042 ///
1043 /// # Example
1044 /// ```rust
1045 /// # #[cfg(feature = "deploy")] {
1046 /// # use hydro_lang::prelude::*;
1047 /// # use futures::StreamExt;
1048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049 /// # let tick = process.tick();
1050 /// # // ticks are lazy by default, forces the second tick to run
1051 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052 /// # let batch_first_tick = process
1053 /// # .source_iter(q!(vec![]))
1054 /// # .batch(&tick, nondet!(/** test */));
1055 /// # let batch_second_tick = process
1056 /// # .source_iter(q!(vec![1, 2, 3]))
1057 /// # .batch(&tick, nondet!(/** test */))
1058 /// # .defer_tick(); // appears on the second tick
1059 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1060 /// input_batch // first tick: [], second tick: [1, 2, 3]
1061 /// .max()
1062 /// .all_ticks()
1063 /// # }, |mut stream| async move {
1064 /// // [3]
1065 /// # for w in vec![3] {
1066 /// # assert_eq!(stream.next().await.unwrap(), w);
1067 /// # }
1068 /// # }));
1069 /// # }
1070 /// ```
1071 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1072 self.into_stream().all_ticks()
1073 }
1074
1075 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1076 /// which will stream the value computed in _each_ tick as a separate stream element.
1077 ///
1078 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1079 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1080 /// optional's [`Tick`] context.
1081 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1082 self.into_stream().all_ticks_atomic()
1083 }
1084
1085 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1086 /// be asynchronously updated with the latest value of the optional inside the tick, including
1087 /// whether the optional is null or not.
1088 ///
1089 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1090 /// tick that tracks the inner value. This is useful for getting the value as of the
1091 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1092 ///
1093 /// # Example
1094 /// ```rust
1095 /// # #[cfg(feature = "deploy")] {
1096 /// # use hydro_lang::prelude::*;
1097 /// # use futures::StreamExt;
1098 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099 /// # let tick = process.tick();
1100 /// # // ticks are lazy by default, forces the second tick to run
1101 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1102 /// # let batch_first_tick = process
1103 /// # .source_iter(q!(vec![]))
1104 /// # .batch(&tick, nondet!(/** test */));
1105 /// # let batch_second_tick = process
1106 /// # .source_iter(q!(vec![1, 2, 3]))
1107 /// # .batch(&tick, nondet!(/** test */))
1108 /// # .defer_tick(); // appears on the second tick
1109 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1110 /// input_batch // first tick: [], second tick: [1, 2, 3]
1111 /// .max()
1112 /// .latest()
1113 /// # .into_singleton()
1114 /// # .sample_eager(nondet!(/** test */))
1115 /// # }, |mut stream| async move {
1116 /// // asynchronously changes from None ~> 3
1117 /// # for w in vec![None, Some(3)] {
1118 /// # assert_eq!(stream.next().await.unwrap(), w);
1119 /// # }
1120 /// # }));
1121 /// # }
1122 /// ```
1123 pub fn latest(self) -> Optional<T, L, Unbounded> {
1124 Optional::new(
1125 self.location.outer().clone(),
1126 HydroNode::YieldConcat {
1127 inner: Box::new(self.ir_node.into_inner()),
1128 metadata: self
1129 .location
1130 .outer()
1131 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1132 },
1133 )
1134 }
1135
1136 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1137 /// be updated with the latest value of the optional inside the tick.
1138 ///
1139 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1140 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1141 /// optional's [`Tick`] context.
1142 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1143 let out_location = Atomic {
1144 tick: self.location.clone(),
1145 };
1146
1147 Optional::new(
1148 out_location.clone(),
1149 HydroNode::YieldConcat {
1150 inner: Box::new(self.ir_node.into_inner()),
1151 metadata: out_location
1152 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1153 },
1154 )
1155 }
1156
1157 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1158 /// always has the state of `self` at tick `T - 1`.
1159 ///
1160 /// At tick `0`, the output optional is null, since there is no previous tick.
1161 ///
1162 /// This operator enables stateful iterative processing with ticks, by sending data from one
1163 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1164 ///
1165 /// # Example
1166 /// ```rust
1167 /// # #[cfg(feature = "deploy")] {
1168 /// # use hydro_lang::prelude::*;
1169 /// # use futures::StreamExt;
1170 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171 /// let tick = process.tick();
1172 /// // ticks are lazy by default, forces the second tick to run
1173 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1174 ///
1175 /// let batch_first_tick = process
1176 /// .source_iter(q!(vec![1, 2]))
1177 /// .batch(&tick, nondet!(/** test */));
1178 /// let batch_second_tick = process
1179 /// .source_iter(q!(vec![3, 4]))
1180 /// .batch(&tick, nondet!(/** test */))
1181 /// .defer_tick(); // appears on the second tick
1182 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1183 /// .reduce(q!(|state, v| *state += v));
1184 ///
1185 /// current_tick_sum.clone().into_singleton().zip(
1186 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1187 /// ).all_ticks()
1188 /// # }, |mut stream| async move {
1189 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1190 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1191 /// # assert_eq!(stream.next().await.unwrap(), w);
1192 /// # }
1193 /// # }));
1194 /// # }
1195 /// ```
1196 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1197 Optional::new(
1198 self.location.clone(),
1199 HydroNode::DeferTick {
1200 input: Box::new(self.ir_node.into_inner()),
1201 metadata: self.location.new_node_metadata(Self::collection_kind()),
1202 },
1203 )
1204 }
1205
1206 #[deprecated(note = "use .into_stream().persist()")]
1207 #[expect(missing_docs, reason = "deprecated")]
1208 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1209 where
1210 T: Clone,
1211 {
1212 self.into_stream().persist()
1213 }
1214
1215 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1216 /// non-null. Otherwise, the stream is empty.
1217 ///
1218 /// # Example
1219 /// ```rust
1220 /// # #[cfg(feature = "deploy")] {
1221 /// # use hydro_lang::prelude::*;
1222 /// # use futures::StreamExt;
1223 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224 /// # let tick = process.tick();
1225 /// # // ticks are lazy by default, forces the second tick to run
1226 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1227 /// # let batch_first_tick = process
1228 /// # .source_iter(q!(vec![]))
1229 /// # .batch(&tick, nondet!(/** test */));
1230 /// # let batch_second_tick = process
1231 /// # .source_iter(q!(vec![123, 456]))
1232 /// # .batch(&tick, nondet!(/** test */))
1233 /// # .defer_tick(); // appears on the second tick
1234 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1235 /// input_batch // first tick: [], second tick: [123, 456]
1236 /// .clone()
1237 /// .max()
1238 /// .into_stream()
1239 /// .chain(input_batch)
1240 /// .all_ticks()
1241 /// # }, |mut stream| async move {
1242 /// // [456, 123, 456]
1243 /// # for w in vec![456, 123, 456] {
1244 /// # assert_eq!(stream.next().await.unwrap(), w);
1245 /// # }
1246 /// # }));
1247 /// # }
1248 /// ```
1249 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1250 Stream::new(
1251 self.location.clone(),
1252 HydroNode::Cast {
1253 inner: Box::new(self.ir_node.into_inner()),
1254 metadata: self.location.new_node_metadata(Stream::<
1255 T,
1256 Tick<L>,
1257 Bounded,
1258 TotalOrder,
1259 ExactlyOnce,
1260 >::collection_kind()),
1261 },
1262 )
1263 }
1264}
1265
1266#[cfg(feature = "deploy")]
1267#[cfg(test)]
1268mod tests {
1269 use futures::StreamExt;
1270 use hydro_deploy::Deployment;
1271 use stageleft::q;
1272
1273 use super::Optional;
1274 use crate::compile::builder::FlowBuilder;
1275 use crate::location::Location;
1276 use crate::nondet::nondet;
1277
1278 #[tokio::test]
1279 async fn optional_or_cardinality() {
1280 let mut deployment = Deployment::new();
1281
1282 let flow = FlowBuilder::new();
1283 let node = flow.process::<()>();
1284 let external = flow.external::<()>();
1285
1286 let node_tick = node.tick();
1287 let tick_singleton = node_tick.singleton(q!(123));
1288 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1289 let counts = tick_optional_inhabited
1290 .clone()
1291 .or(tick_optional_inhabited)
1292 .into_stream()
1293 .count()
1294 .all_ticks()
1295 .send_bincode_external(&external);
1296
1297 let nodes = flow
1298 .with_process(&node, deployment.Localhost())
1299 .with_external(&external, deployment.Localhost())
1300 .deploy(&mut deployment);
1301
1302 deployment.deploy().await.unwrap();
1303
1304 let mut external_out = nodes.connect(counts).await;
1305
1306 deployment.start().await.unwrap();
1307
1308 assert_eq!(external_out.next().await.unwrap(), 1);
1309 }
1310
1311 #[tokio::test]
1312 async fn into_singleton_top_level_none_cardinality() {
1313 let mut deployment = Deployment::new();
1314
1315 let flow = FlowBuilder::new();
1316 let node = flow.process::<()>();
1317 let external = flow.external::<()>();
1318
1319 let node_tick = node.tick();
1320 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1321 let into_singleton = top_level_none.into_singleton();
1322
1323 let tick_driver = node.spin();
1324
1325 let counts = into_singleton
1326 .snapshot(&node_tick, nondet!(/** test */))
1327 .into_stream()
1328 .count()
1329 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1330 .map(q!(|(c, _)| c))
1331 .all_ticks()
1332 .send_bincode_external(&external);
1333
1334 let nodes = flow
1335 .with_process(&node, deployment.Localhost())
1336 .with_external(&external, deployment.Localhost())
1337 .deploy(&mut deployment);
1338
1339 deployment.deploy().await.unwrap();
1340
1341 let mut external_out = nodes.connect(counts).await;
1342
1343 deployment.start().await.unwrap();
1344
1345 assert_eq!(external_out.next().await.unwrap(), 1);
1346 assert_eq!(external_out.next().await.unwrap(), 1);
1347 assert_eq!(external_out.next().await.unwrap(), 1);
1348 }
1349}