hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41
42 _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47 T: Clone,
48 L: Location<'a> + NoTick,
49{
50 fn from(value: Singleton<T, L, Bounded>) -> Self {
51 let tick = value.location().tick();
52 value.clone_into_tick(&tick).latest()
53 }
54}
55
56impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
57where
58 L: Location<'a>,
59{
60 type Location = Tick<L>;
61
62 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
63 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
64 location.clone(),
65 HydroNode::DeferTick {
66 input: Box::new(HydroNode::CycleSource {
67 ident,
68 metadata: location.new_node_metadata(Self::collection_kind()),
69 }),
70 metadata: location
71 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
72 },
73 );
74
75 from_previous_tick.unwrap_or(initial)
76 }
77}
78
79impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
80where
81 L: Location<'a>,
82{
83 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
84 assert_eq!(
85 Location::id(&self.location),
86 expected_location,
87 "locations do not match"
88 );
89 self.location
90 .flow_state()
91 .borrow_mut()
92 .push_root(HydroRoot::CycleSink {
93 ident,
94 input: Box::new(self.ir_node.into_inner()),
95 op_metadata: HydroIrOpMetadata::new(),
96 });
97 }
98}
99
100impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
101where
102 L: Location<'a>,
103{
104 type Location = Tick<L>;
105
106 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
107 Singleton::new(
108 location.clone(),
109 HydroNode::CycleSource {
110 ident,
111 metadata: location.new_node_metadata(Self::collection_kind()),
112 },
113 )
114 }
115}
116
117impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
118where
119 L: Location<'a>,
120{
121 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
122 assert_eq!(
123 Location::id(&self.location),
124 expected_location,
125 "locations do not match"
126 );
127 self.location
128 .flow_state()
129 .borrow_mut()
130 .push_root(HydroRoot::CycleSink {
131 ident,
132 input: Box::new(self.ir_node.into_inner()),
133 op_metadata: HydroIrOpMetadata::new(),
134 });
135 }
136}
137
138impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
139where
140 L: Location<'a> + NoTick,
141{
142 type Location = L;
143
144 fn create_source(ident: syn::Ident, location: L) -> Self {
145 Singleton::new(
146 location.clone(),
147 HydroNode::CycleSource {
148 ident,
149 metadata: location.new_node_metadata(Self::collection_kind()),
150 },
151 )
152 }
153}
154
155impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
156where
157 L: Location<'a> + NoTick,
158{
159 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160 assert_eq!(
161 Location::id(&self.location),
162 expected_location,
163 "locations do not match"
164 );
165 self.location
166 .flow_state()
167 .borrow_mut()
168 .push_root(HydroRoot::CycleSink {
169 ident,
170 input: Box::new(self.ir_node.into_inner()),
171 op_metadata: HydroIrOpMetadata::new(),
172 });
173 }
174}
175
176impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
177where
178 T: Clone,
179 L: Location<'a>,
180{
181 fn clone(&self) -> Self {
182 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184 *self.ir_node.borrow_mut() = HydroNode::Tee {
185 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
186 metadata: self.location.new_node_metadata(Self::collection_kind()),
187 };
188 }
189
190 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191 Singleton {
192 location: self.location.clone(),
193 ir_node: HydroNode::Tee {
194 inner: TeeNode(inner.0.clone()),
195 metadata: metadata.clone(),
196 }
197 .into(),
198 _phantom: PhantomData,
199 }
200 } else {
201 unreachable!()
202 }
203 }
204}
205
206#[cfg(stageleft_runtime)]
207fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
208 me: Singleton<T, Tick<L>, B>,
209 other: O,
210) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
211where
212 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
213{
214 check_matching_location(
215 &me.location,
216 &Singleton::<T, Tick<L>, B>::other_location(&other),
217 );
218
219 Singleton::<T, Tick<L>, B>::make(
220 me.location.clone(),
221 HydroNode::CrossSingleton {
222 left: Box::new(me.ir_node.into_inner()),
223 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
224 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
225 bound: B::BOUND_KIND,
226 element_type: stageleft::quote_type::<
227 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
228 >()
229 .into(),
230 }),
231 },
232 )
233}
234
235impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
236where
237 L: Location<'a>,
238{
239 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
240 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
241 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
242 Singleton {
243 location,
244 ir_node: RefCell::new(ir_node),
245 _phantom: PhantomData,
246 }
247 }
248
249 pub(crate) fn collection_kind() -> CollectionKind {
250 CollectionKind::Singleton {
251 bound: B::BOUND_KIND,
252 element_type: stageleft::quote_type::<T>().into(),
253 }
254 }
255
256 /// Returns the [`Location`] where this singleton is being materialized.
257 pub fn location(&self) -> &L {
258 &self.location
259 }
260
261 /// Transforms the singleton value by applying a function `f` to it,
262 /// continuously as the input is updated.
263 ///
264 /// # Example
265 /// ```rust
266 /// # #[cfg(feature = "deploy")] {
267 /// # use hydro_lang::prelude::*;
268 /// # use futures::StreamExt;
269 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
270 /// let tick = process.tick();
271 /// let singleton = tick.singleton(q!(5));
272 /// singleton.map(q!(|v| v * 2)).all_ticks()
273 /// # }, |mut stream| async move {
274 /// // 10
275 /// # assert_eq!(stream.next().await.unwrap(), 10);
276 /// # }));
277 /// # }
278 /// ```
279 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
280 where
281 F: Fn(T) -> U + 'a,
282 {
283 let f = f.splice_fn1_ctx(&self.location).into();
284 Singleton::new(
285 self.location.clone(),
286 HydroNode::Map {
287 f,
288 input: Box::new(self.ir_node.into_inner()),
289 metadata: self
290 .location
291 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
292 },
293 )
294 }
295
296 /// Transforms the singleton value by applying a function `f` to it and then flattening
297 /// the result into a stream, preserving the order of elements.
298 ///
299 /// The function `f` is applied to the singleton value to produce an iterator, and all items
300 /// from that iterator are emitted in the output stream in deterministic order.
301 ///
302 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
303 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
304 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
305 ///
306 /// # Example
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312 /// let tick = process.tick();
313 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
314 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
315 /// # }, |mut stream| async move {
316 /// // 1, 2, 3
317 /// # for w in vec![1, 2, 3] {
318 /// # assert_eq!(stream.next().await.unwrap(), w);
319 /// # }
320 /// # }));
321 /// # }
322 /// ```
323 pub fn flat_map_ordered<U, I, F>(
324 self,
325 f: impl IntoQuotedMut<'a, F, L>,
326 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
327 where
328 I: IntoIterator<Item = U>,
329 F: Fn(T) -> I + 'a,
330 {
331 let f = f.splice_fn1_ctx(&self.location).into();
332 Stream::new(
333 self.location.clone(),
334 HydroNode::FlatMap {
335 f,
336 input: Box::new(self.ir_node.into_inner()),
337 metadata: self.location.new_node_metadata(
338 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
339 ),
340 },
341 )
342 }
343
344 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
345 /// for the output type `I` to produce items in any order.
346 ///
347 /// The function `f` is applied to the singleton value to produce an iterator, and all items
348 /// from that iterator are emitted in the output stream in non-deterministic order.
349 ///
350 /// # Example
351 /// ```rust
352 /// # #[cfg(feature = "deploy")] {
353 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
354 /// # use futures::StreamExt;
355 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
356 /// let tick = process.tick();
357 /// let singleton = tick.singleton(q!(
358 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
359 /// ));
360 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
361 /// # }, |mut stream| async move {
362 /// // 1, 2, 3, but in no particular order
363 /// # let mut results = Vec::new();
364 /// # for _ in 0..3 {
365 /// # results.push(stream.next().await.unwrap());
366 /// # }
367 /// # results.sort();
368 /// # assert_eq!(results, vec![1, 2, 3]);
369 /// # }));
370 /// # }
371 /// ```
372 pub fn flat_map_unordered<U, I, F>(
373 self,
374 f: impl IntoQuotedMut<'a, F, L>,
375 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
376 where
377 I: IntoIterator<Item = U>,
378 F: Fn(T) -> I + 'a,
379 {
380 let f = f.splice_fn1_ctx(&self.location).into();
381 Stream::new(
382 self.location.clone(),
383 HydroNode::FlatMap {
384 f,
385 input: Box::new(self.ir_node.into_inner()),
386 metadata: self
387 .location
388 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
389 },
390 )
391 }
392
393 /// Flattens the singleton value into a stream, preserving the order of elements.
394 ///
395 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
396 /// are emitted in the output stream in deterministic order.
397 ///
398 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
399 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
400 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
401 ///
402 /// # Example
403 /// ```rust
404 /// # #[cfg(feature = "deploy")] {
405 /// # use hydro_lang::prelude::*;
406 /// # use futures::StreamExt;
407 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
408 /// let tick = process.tick();
409 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
410 /// singleton.flatten_ordered().all_ticks()
411 /// # }, |mut stream| async move {
412 /// // 1, 2, 3
413 /// # for w in vec![1, 2, 3] {
414 /// # assert_eq!(stream.next().await.unwrap(), w);
415 /// # }
416 /// # }));
417 /// # }
418 /// ```
419 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
420 where
421 T: IntoIterator<Item = U>,
422 {
423 self.flat_map_ordered(q!(|x| x))
424 }
425
426 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
427 /// for the element type `T` to produce items in any order.
428 ///
429 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
430 /// are emitted in the output stream in non-deterministic order.
431 ///
432 /// # Example
433 /// ```rust
434 /// # #[cfg(feature = "deploy")] {
435 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
436 /// # use futures::StreamExt;
437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
438 /// let tick = process.tick();
439 /// let singleton = tick.singleton(q!(
440 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
441 /// ));
442 /// singleton.flatten_unordered().all_ticks()
443 /// # }, |mut stream| async move {
444 /// // 1, 2, 3, but in no particular order
445 /// # let mut results = Vec::new();
446 /// # for _ in 0..3 {
447 /// # results.push(stream.next().await.unwrap());
448 /// # }
449 /// # results.sort();
450 /// # assert_eq!(results, vec![1, 2, 3]);
451 /// # }));
452 /// # }
453 /// ```
454 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
455 where
456 T: IntoIterator<Item = U>,
457 {
458 self.flat_map_unordered(q!(|x| x))
459 }
460
461 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
462 ///
463 /// If the predicate returns `true`, the output optional contains the same value.
464 /// If the predicate returns `false`, the output optional is empty.
465 ///
466 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
467 /// not modify or take ownership of the value. If you need to modify the value while filtering
468 /// use [`Singleton::filter_map`] instead.
469 ///
470 /// # Example
471 /// ```rust
472 /// # #[cfg(feature = "deploy")] {
473 /// # use hydro_lang::prelude::*;
474 /// # use futures::StreamExt;
475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
476 /// let tick = process.tick();
477 /// let singleton = tick.singleton(q!(5));
478 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
479 /// # }, |mut stream| async move {
480 /// // 5
481 /// # assert_eq!(stream.next().await.unwrap(), 5);
482 /// # }));
483 /// # }
484 /// ```
485 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
486 where
487 F: Fn(&T) -> bool + 'a,
488 {
489 let f = f.splice_fn1_borrow_ctx(&self.location).into();
490 Optional::new(
491 self.location.clone(),
492 HydroNode::Filter {
493 f,
494 input: Box::new(self.ir_node.into_inner()),
495 metadata: self
496 .location
497 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
498 },
499 )
500 }
501
502 /// An operator that both filters and maps. It yields the value only if the supplied
503 /// closure `f` returns `Some(value)`.
504 ///
505 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
506 /// If the closure returns `None`, the output optional is empty.
507 ///
508 /// # Example
509 /// ```rust
510 /// # #[cfg(feature = "deploy")] {
511 /// # use hydro_lang::prelude::*;
512 /// # use futures::StreamExt;
513 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
514 /// let tick = process.tick();
515 /// let singleton = tick.singleton(q!("42"));
516 /// singleton
517 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
518 /// .all_ticks()
519 /// # }, |mut stream| async move {
520 /// // 42
521 /// # assert_eq!(stream.next().await.unwrap(), 42);
522 /// # }));
523 /// # }
524 /// ```
525 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
526 where
527 F: Fn(T) -> Option<U> + 'a,
528 {
529 let f = f.splice_fn1_ctx(&self.location).into();
530 Optional::new(
531 self.location.clone(),
532 HydroNode::FilterMap {
533 f,
534 input: Box::new(self.ir_node.into_inner()),
535 metadata: self
536 .location
537 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
538 },
539 )
540 }
541
542 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
543 ///
544 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
545 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
546 /// non-null. This is useful for combining several pieces of state together.
547 ///
548 /// # Example
549 /// ```rust
550 /// # #[cfg(feature = "deploy")] {
551 /// # use hydro_lang::prelude::*;
552 /// # use futures::StreamExt;
553 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554 /// let tick = process.tick();
555 /// let numbers = process
556 /// .source_iter(q!(vec![123, 456]))
557 /// .batch(&tick, nondet!(/** test */));
558 /// let count = numbers.clone().count(); // Singleton
559 /// let max = numbers.max(); // Optional
560 /// count.zip(max).all_ticks()
561 /// # }, |mut stream| async move {
562 /// // [(2, 456)]
563 /// # for w in vec![(2, 456)] {
564 /// # assert_eq!(stream.next().await.unwrap(), w);
565 /// # }
566 /// # }));
567 /// # }
568 /// ```
569 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
570 where
571 Self: ZipResult<'a, O, Location = L>,
572 {
573 check_matching_location(&self.location, &Self::other_location(&other));
574
575 if L::is_top_level()
576 && let Some(tick) = self.location.try_tick()
577 {
578 let out = zip_inside_tick(
579 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
580 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
581 Self::other_location(&other),
582 Self::other_ir_node(other),
583 )
584 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
585 )
586 .latest();
587
588 Self::make(out.location, out.ir_node.into_inner())
589 } else {
590 Self::make(
591 self.location.clone(),
592 HydroNode::CrossSingleton {
593 left: Box::new(self.ir_node.into_inner()),
594 right: Box::new(Self::other_ir_node(other)),
595 metadata: self.location.new_node_metadata(CollectionKind::Optional {
596 bound: B::BOUND_KIND,
597 element_type: stageleft::quote_type::<
598 <Self as ZipResult<'a, O>>::ElementType,
599 >()
600 .into(),
601 }),
602 },
603 )
604 }
605 }
606
607 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
608 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
609 ///
610 /// Useful for conditionally processing, such as only emitting a singleton's value outside
611 /// a tick if some other condition is satisfied.
612 ///
613 /// # Example
614 /// ```rust
615 /// # #[cfg(feature = "deploy")] {
616 /// # use hydro_lang::prelude::*;
617 /// # use futures::StreamExt;
618 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619 /// let tick = process.tick();
620 /// // ticks are lazy by default, forces the second tick to run
621 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
622 ///
623 /// let batch_first_tick = process
624 /// .source_iter(q!(vec![1]))
625 /// .batch(&tick, nondet!(/** test */));
626 /// let batch_second_tick = process
627 /// .source_iter(q!(vec![1, 2, 3]))
628 /// .batch(&tick, nondet!(/** test */))
629 /// .defer_tick(); // appears on the second tick
630 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
631 /// batch_first_tick.chain(batch_second_tick).count()
632 /// .filter_if_some(some_on_first_tick)
633 /// .all_ticks()
634 /// # }, |mut stream| async move {
635 /// // [1]
636 /// # for w in vec![1] {
637 /// # assert_eq!(stream.next().await.unwrap(), w);
638 /// # }
639 /// # }));
640 /// # }
641 /// ```
642 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
643 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
644 .map(q!(|(d, _signal)| d))
645 }
646
647 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
648 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
649 ///
650 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
651 /// the condition.
652 ///
653 /// # Example
654 /// ```rust
655 /// # #[cfg(feature = "deploy")] {
656 /// # use hydro_lang::prelude::*;
657 /// # use futures::StreamExt;
658 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659 /// let tick = process.tick();
660 /// // ticks are lazy by default, forces the second tick to run
661 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
662 ///
663 /// let batch_first_tick = process
664 /// .source_iter(q!(vec![1]))
665 /// .batch(&tick, nondet!(/** test */));
666 /// let batch_second_tick = process
667 /// .source_iter(q!(vec![1, 2, 3]))
668 /// .batch(&tick, nondet!(/** test */))
669 /// .defer_tick(); // appears on the second tick
670 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
671 /// batch_first_tick.chain(batch_second_tick).count()
672 /// .filter_if_none(some_on_first_tick)
673 /// .all_ticks()
674 /// # }, |mut stream| async move {
675 /// // [3]
676 /// # for w in vec![3] {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
683 self.filter_if_some(
684 other
685 .map(q!(|_| ()))
686 .into_singleton()
687 .filter(q!(|o| o.is_none())),
688 )
689 }
690
691 /// An operator which allows you to "name" a `HydroNode`.
692 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
693 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
694 {
695 let mut node = self.ir_node.borrow_mut();
696 let metadata = node.metadata_mut();
697 metadata.tag = Some(name.to_owned());
698 }
699 self
700 }
701}
702
703impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
704where
705 L: Location<'a>,
706{
707 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
708 /// the inner `Option`.
709 ///
710 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
711 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
712 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
713 ///
714 /// # Example
715 /// ```rust
716 /// # #[cfg(feature = "deploy")] {
717 /// # use hydro_lang::prelude::*;
718 /// # use futures::StreamExt;
719 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
720 /// let tick = process.tick();
721 /// let singleton = tick.singleton(q!(Some(42)));
722 /// singleton.into_optional().all_ticks()
723 /// # }, |mut stream| async move {
724 /// // 42
725 /// # assert_eq!(stream.next().await.unwrap(), 42);
726 /// # }));
727 /// # }
728 /// ```
729 pub fn into_optional(self) -> Optional<T, L, B> {
730 self.filter_map(q!(|v| v))
731 }
732}
733
734impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
735where
736 L: Location<'a> + NoTick,
737{
738 /// Returns a singleton value corresponding to the latest snapshot of the singleton
739 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
740 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
741 /// all snapshots of this singleton into the atomic-associated tick will observe the
742 /// same value each tick.
743 ///
744 /// # Non-Determinism
745 /// Because this picks a snapshot of a singleton whose value is continuously changing,
746 /// the output singleton has a non-deterministic value since the snapshot can be at an
747 /// arbitrary point in time.
748 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
749 Singleton::new(
750 self.location.clone().tick,
751 HydroNode::Batch {
752 inner: Box::new(self.ir_node.into_inner()),
753 metadata: self
754 .location
755 .tick
756 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
757 },
758 )
759 }
760
761 /// Returns this singleton back into a top-level, asynchronous execution context where updates
762 /// to the value will be asynchronously propagated.
763 pub fn end_atomic(self) -> Singleton<T, L, B> {
764 Singleton::new(
765 self.location.tick.l.clone(),
766 HydroNode::EndAtomic {
767 inner: Box::new(self.ir_node.into_inner()),
768 metadata: self
769 .location
770 .tick
771 .l
772 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
773 },
774 )
775 }
776}
777
778impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
779where
780 L: Location<'a>,
781{
782 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
783 /// will observe the same version of the value and will be executed synchronously before any
784 /// outputs are yielded (in [`Optional::end_atomic`]).
785 ///
786 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
787 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
788 /// a different version).
789 ///
790 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
791 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
792 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
793 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
794 let out_location = Atomic { tick: tick.clone() };
795 Singleton::new(
796 out_location.clone(),
797 HydroNode::BeginAtomic {
798 inner: Box::new(self.ir_node.into_inner()),
799 metadata: out_location
800 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
801 },
802 )
803 }
804
805 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
806 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
807 /// relevant data that contributed to the snapshot at tick `t`.
808 ///
809 /// # Non-Determinism
810 /// Because this picks a snapshot of a singleton whose value is continuously changing,
811 /// the output singleton has a non-deterministic value since the snapshot can be at an
812 /// arbitrary point in time.
813 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
814 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
815 Singleton::new(
816 tick.clone(),
817 HydroNode::Batch {
818 inner: Box::new(self.ir_node.into_inner()),
819 metadata: tick
820 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
821 },
822 )
823 }
824
825 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
826 /// with order corresponding to increasing prefixes of data contributing to the singleton.
827 ///
828 /// # Non-Determinism
829 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
830 /// to non-deterministic batching and arrival of inputs, the output stream is
831 /// non-deterministic.
832 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
833 where
834 L: NoTick,
835 {
836 sliced! {
837 let snapshot = use(self, nondet);
838 snapshot.into_stream()
839 }
840 .weaken_retries()
841 }
842
843 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
844 /// value taken at various points in time. Because the input singleton may be
845 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
846 /// represent the value of the singleton given some prefix of the streams leading up to
847 /// it.
848 ///
849 /// # Non-Determinism
850 /// The output stream is non-deterministic in which elements are sampled, since this
851 /// is controlled by a clock.
852 pub fn sample_every(
853 self,
854 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
855 nondet: NonDet,
856 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
857 where
858 L: NoTick + NoAtomic,
859 {
860 let samples = self.location.source_interval(interval, nondet);
861 sliced! {
862 let snapshot = use(self, nondet);
863 let sample_batch = use(samples, nondet);
864
865 snapshot.filter_if_some(sample_batch.first()).into_stream()
866 }
867 .weaken_retries()
868 }
869}
870
871impl<'a, T, L> Singleton<T, L, Bounded>
872where
873 L: Location<'a>,
874{
875 /// Clones this bounded singleton into a tick, returning a singleton that has the
876 /// same value as the outer singleton. Because the outer singleton is bounded, this
877 /// is deterministic because there is only a single immutable version.
878 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
879 where
880 T: Clone,
881 {
882 // TODO(shadaj): avoid printing simulator logs for this snapshot
883 self.snapshot(
884 tick,
885 nondet!(/** bounded top-level singleton so deterministic */),
886 )
887 }
888
889 /// Converts this singleton into a [`Stream`] containing a single element, the value.
890 ///
891 /// # Example
892 /// ```rust
893 /// # #[cfg(feature = "deploy")] {
894 /// # use hydro_lang::prelude::*;
895 /// # use futures::StreamExt;
896 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
897 /// let tick = process.tick();
898 /// let batch_input = process
899 /// .source_iter(q!(vec![123, 456]))
900 /// .batch(&tick, nondet!(/** test */));
901 /// batch_input.clone().chain(
902 /// batch_input.count().into_stream()
903 /// ).all_ticks()
904 /// # }, |mut stream| async move {
905 /// // [123, 456, 2]
906 /// # for w in vec![123, 456, 2] {
907 /// # assert_eq!(stream.next().await.unwrap(), w);
908 /// # }
909 /// # }));
910 /// # }
911 /// ```
912 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
913 Stream::new(
914 self.location.clone(),
915 HydroNode::Cast {
916 inner: Box::new(self.ir_node.into_inner()),
917 metadata: self.location.new_node_metadata(Stream::<
918 T,
919 Tick<L>,
920 Bounded,
921 TotalOrder,
922 ExactlyOnce,
923 >::collection_kind()),
924 },
925 )
926 }
927}
928
929impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
930where
931 L: Location<'a>,
932{
933 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
934 /// which will stream the value computed in _each_ tick as a separate stream element.
935 ///
936 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
937 /// producing one element in the output for each tick. This is useful for batched computations,
938 /// where the results from each tick must be combined together.
939 ///
940 /// # Example
941 /// ```rust
942 /// # #[cfg(feature = "deploy")] {
943 /// # use hydro_lang::prelude::*;
944 /// # use futures::StreamExt;
945 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
946 /// let tick = process.tick();
947 /// # // ticks are lazy by default, forces the second tick to run
948 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
949 /// # let batch_first_tick = process
950 /// # .source_iter(q!(vec![1]))
951 /// # .batch(&tick, nondet!(/** test */));
952 /// # let batch_second_tick = process
953 /// # .source_iter(q!(vec![1, 2, 3]))
954 /// # .batch(&tick, nondet!(/** test */))
955 /// # .defer_tick(); // appears on the second tick
956 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
957 /// input_batch // first tick: [1], second tick: [1, 2, 3]
958 /// .count()
959 /// .all_ticks()
960 /// # }, |mut stream| async move {
961 /// // [1, 3]
962 /// # for w in vec![1, 3] {
963 /// # assert_eq!(stream.next().await.unwrap(), w);
964 /// # }
965 /// # }));
966 /// # }
967 /// ```
968 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
969 self.into_stream().all_ticks()
970 }
971
972 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
973 /// which will stream the value computed in _each_ tick as a separate stream element.
974 ///
975 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
976 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
977 /// singleton's [`Tick`] context.
978 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
979 self.into_stream().all_ticks_atomic()
980 }
981
982 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
983 /// be asynchronously updated with the latest value of the singleton inside the tick.
984 ///
985 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
986 /// tick that tracks the inner value. This is useful for getting the value as of the
987 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
988 ///
989 /// # Example
990 /// ```rust
991 /// # #[cfg(feature = "deploy")] {
992 /// # use hydro_lang::prelude::*;
993 /// # use futures::StreamExt;
994 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
995 /// let tick = process.tick();
996 /// # // ticks are lazy by default, forces the second tick to run
997 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
998 /// # let batch_first_tick = process
999 /// # .source_iter(q!(vec![1]))
1000 /// # .batch(&tick, nondet!(/** test */));
1001 /// # let batch_second_tick = process
1002 /// # .source_iter(q!(vec![1, 2, 3]))
1003 /// # .batch(&tick, nondet!(/** test */))
1004 /// # .defer_tick(); // appears on the second tick
1005 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1006 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1007 /// .count()
1008 /// .latest()
1009 /// # .sample_eager(nondet!(/** test */))
1010 /// # }, |mut stream| async move {
1011 /// // asynchronously changes from 1 ~> 3
1012 /// # for w in vec![1, 3] {
1013 /// # assert_eq!(stream.next().await.unwrap(), w);
1014 /// # }
1015 /// # }));
1016 /// # }
1017 /// ```
1018 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1019 Singleton::new(
1020 self.location.outer().clone(),
1021 HydroNode::YieldConcat {
1022 inner: Box::new(self.ir_node.into_inner()),
1023 metadata: self
1024 .location
1025 .outer()
1026 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1027 },
1028 )
1029 }
1030
1031 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1032 /// be updated with the latest value of the singleton inside the tick.
1033 ///
1034 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1035 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1036 /// singleton's [`Tick`] context.
1037 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1038 let out_location = Atomic {
1039 tick: self.location.clone(),
1040 };
1041 Singleton::new(
1042 out_location.clone(),
1043 HydroNode::YieldConcat {
1044 inner: Box::new(self.ir_node.into_inner()),
1045 metadata: out_location
1046 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1047 },
1048 )
1049 }
1050}
1051
1052#[doc(hidden)]
1053/// Helper trait that determines the output collection type for [`Singleton::zip`].
1054///
1055/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1056/// [`Singleton`].
1057#[sealed::sealed]
1058pub trait ZipResult<'a, Other> {
1059 /// The output collection type.
1060 type Out;
1061 /// The type of the tupled output value.
1062 type ElementType;
1063 /// The type of the other collection's value.
1064 type OtherType;
1065 /// The location where the tupled result will be materialized.
1066 type Location: Location<'a>;
1067
1068 /// The location of the second input to the `zip`.
1069 fn other_location(other: &Other) -> Self::Location;
1070 /// The IR node of the second input to the `zip`.
1071 fn other_ir_node(other: Other) -> HydroNode;
1072
1073 /// Constructs the output live collection given an IR node containing the zip result.
1074 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1075}
1076
1077#[sealed::sealed]
1078impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1079where
1080 L: Location<'a>,
1081{
1082 type Out = Singleton<(T, U), L, B>;
1083 type ElementType = (T, U);
1084 type OtherType = U;
1085 type Location = L;
1086
1087 fn other_location(other: &Singleton<U, L, B>) -> L {
1088 other.location.clone()
1089 }
1090
1091 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1092 other.ir_node.into_inner()
1093 }
1094
1095 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1096 Singleton::new(
1097 location.clone(),
1098 HydroNode::Cast {
1099 inner: Box::new(ir_node),
1100 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1101 },
1102 )
1103 }
1104}
1105
1106#[sealed::sealed]
1107impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1108where
1109 L: Location<'a>,
1110{
1111 type Out = Optional<(T, U), L, B>;
1112 type ElementType = (T, U);
1113 type OtherType = U;
1114 type Location = L;
1115
1116 fn other_location(other: &Optional<U, L, B>) -> L {
1117 other.location.clone()
1118 }
1119
1120 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1121 other.ir_node.into_inner()
1122 }
1123
1124 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1125 Optional::new(location, ir_node)
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 #[cfg(feature = "deploy")]
1132 use futures::{SinkExt, StreamExt};
1133 #[cfg(feature = "deploy")]
1134 use hydro_deploy::Deployment;
1135 #[cfg(any(feature = "deploy", feature = "sim"))]
1136 use stageleft::q;
1137
1138 #[cfg(any(feature = "deploy", feature = "sim"))]
1139 use crate::compile::builder::FlowBuilder;
1140 #[cfg(feature = "deploy")]
1141 use crate::live_collections::stream::ExactlyOnce;
1142 #[cfg(any(feature = "deploy", feature = "sim"))]
1143 use crate::location::Location;
1144 #[cfg(any(feature = "deploy", feature = "sim"))]
1145 use crate::nondet::nondet;
1146
1147 #[cfg(feature = "deploy")]
1148 #[tokio::test]
1149 async fn tick_cycle_cardinality() {
1150 let mut deployment = Deployment::new();
1151
1152 let mut flow = FlowBuilder::new();
1153 let node = flow.process::<()>();
1154 let external = flow.external::<()>();
1155
1156 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1157
1158 let node_tick = node.tick();
1159 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1160 let counts = singleton
1161 .clone()
1162 .into_stream()
1163 .count()
1164 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1165 .all_ticks()
1166 .send_bincode_external(&external);
1167 complete_cycle.complete_next_tick(singleton);
1168
1169 let nodes = flow
1170 .with_process(&node, deployment.Localhost())
1171 .with_external(&external, deployment.Localhost())
1172 .deploy(&mut deployment);
1173
1174 deployment.deploy().await.unwrap();
1175
1176 let mut tick_trigger = nodes.connect(input_send).await;
1177 let mut external_out = nodes.connect(counts).await;
1178
1179 deployment.start().await.unwrap();
1180
1181 tick_trigger.send(()).await.unwrap();
1182
1183 assert_eq!(external_out.next().await.unwrap(), 1);
1184
1185 tick_trigger.send(()).await.unwrap();
1186
1187 assert_eq!(external_out.next().await.unwrap(), 1);
1188 }
1189
1190 #[cfg(feature = "sim")]
1191 #[test]
1192 #[should_panic]
1193 fn sim_fold_intermediate_states() {
1194 let mut flow = FlowBuilder::new();
1195 let node = flow.process::<()>();
1196
1197 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1198 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1199
1200 let tick = node.tick();
1201 let batch = folded.snapshot(&tick, nondet!(/** test */));
1202 let out_recv = batch.all_ticks().sim_output();
1203
1204 flow.sim().exhaustive(async || {
1205 assert_eq!(out_recv.next().await.unwrap(), 10);
1206 });
1207 }
1208
1209 #[cfg(feature = "sim")]
1210 #[test]
1211 fn sim_fold_intermediate_state_count() {
1212 let mut flow = FlowBuilder::new();
1213 let node = flow.process::<()>();
1214
1215 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1216 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1217
1218 let tick = node.tick();
1219 let batch = folded.snapshot(&tick, nondet!(/** test */));
1220 let out_recv = batch.all_ticks().sim_output();
1221
1222 let instance_count = flow.sim().exhaustive(async || {
1223 let out = out_recv.collect::<Vec<_>>().await;
1224 assert_eq!(out.last(), Some(&10));
1225 });
1226
1227 assert_eq!(
1228 instance_count,
1229 16 // 2^4 possible subsets of intermediates (including initial state)
1230 )
1231 }
1232
1233 #[cfg(feature = "sim")]
1234 #[test]
1235 fn sim_fold_no_repeat_initial() {
1236 // check that we don't repeat the initial state of the fold in autonomous decisions
1237
1238 let mut flow = FlowBuilder::new();
1239 let node = flow.process::<()>();
1240
1241 let (in_port, input) = node.sim_input();
1242 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1243
1244 let tick = node.tick();
1245 let batch = folded.snapshot(&tick, nondet!(/** test */));
1246 let out_recv = batch.all_ticks().sim_output();
1247
1248 flow.sim().exhaustive(async || {
1249 assert_eq!(out_recv.next().await.unwrap(), 0);
1250
1251 in_port.send(123);
1252
1253 assert_eq!(out_recv.next().await.unwrap(), 123);
1254 });
1255 }
1256
1257 #[cfg(feature = "sim")]
1258 #[test]
1259 #[should_panic]
1260 fn sim_fold_repeats_snapshots() {
1261 // when the tick is driven by a snapshot AND something else, the snapshot can
1262 // "stutter" and repeat the same state multiple times
1263
1264 let mut flow = FlowBuilder::new();
1265 let node = flow.process::<()>();
1266
1267 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1268 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1269
1270 let tick = node.tick();
1271 let batch = source
1272 .batch(&tick, nondet!(/** test */))
1273 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1274 let out_recv = batch.all_ticks().sim_output();
1275
1276 flow.sim().exhaustive(async || {
1277 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1278 {
1279 panic!("repeated snapshot");
1280 }
1281 });
1282 }
1283
1284 #[cfg(feature = "sim")]
1285 #[test]
1286 fn sim_fold_repeats_snapshots_count() {
1287 // check the number of instances
1288 let mut flow = FlowBuilder::new();
1289 let node = flow.process::<()>();
1290
1291 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1292 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1293
1294 let tick = node.tick();
1295 let batch = source
1296 .batch(&tick, nondet!(/** test */))
1297 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1298 let out_recv = batch.all_ticks().sim_output();
1299
1300 let count = flow.sim().exhaustive(async || {
1301 let _ = out_recv.collect::<Vec<_>>().await;
1302 });
1303
1304 assert_eq!(count, 52);
1305 // don't have a combinatorial explanation for this number yet, but checked via logs
1306 }
1307
1308 #[cfg(feature = "sim")]
1309 #[test]
1310 fn sim_top_level_singleton_exhaustive() {
1311 // ensures that top-level singletons have only one snapshot
1312 let mut flow = FlowBuilder::new();
1313 let node = flow.process::<()>();
1314
1315 let singleton = node.singleton(q!(1));
1316 let tick = node.tick();
1317 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1318 let out_recv = batch.all_ticks().sim_output();
1319
1320 let count = flow.sim().exhaustive(async || {
1321 let _ = out_recv.collect::<Vec<_>>().await;
1322 });
1323
1324 assert_eq!(count, 1);
1325 }
1326
1327 #[cfg(feature = "sim")]
1328 #[test]
1329 fn sim_top_level_singleton_join_count() {
1330 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1331 // exploration
1332
1333 let mut flow = FlowBuilder::new();
1334 let node = flow.process::<()>();
1335
1336 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1337 let tick = node.tick();
1338 let batch = source_iter
1339 .batch(&tick, nondet!(/** test */))
1340 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1341 let out_recv = batch.all_ticks().sim_output();
1342
1343 let instance_count = flow.sim().exhaustive(async || {
1344 let _ = out_recv.collect::<Vec<_>>().await;
1345 });
1346
1347 assert_eq!(
1348 instance_count,
1349 16 // 2^4 ways to split up (including a possibly empty first batch)
1350 )
1351 }
1352
1353 #[cfg(feature = "sim")]
1354 #[test]
1355 fn top_level_singleton_into_stream_no_replay() {
1356 let mut flow = FlowBuilder::new();
1357 let node = flow.process::<()>();
1358
1359 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1360 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1361
1362 let out_recv = folded.into_stream().sim_output();
1363
1364 flow.sim().exhaustive(async || {
1365 out_recv.assert_yields_only([10]).await;
1366 });
1367 }
1368}