hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41
42 _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47 T: Clone,
48 L: Location<'a> + NoTick,
49{
50 fn from(value: Singleton<T, L, Bounded>) -> Self {
51 let tick = value.location().tick();
52 value.clone_into_tick(&tick).latest()
53 }
54}
55
56impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
57where
58 L: Location<'a>,
59{
60 type Location = Tick<L>;
61
62 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
63 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
64 location.clone(),
65 HydroNode::DeferTick {
66 input: Box::new(HydroNode::CycleSource {
67 ident,
68 metadata: location.new_node_metadata(Self::collection_kind()),
69 }),
70 metadata: location
71 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
72 },
73 );
74
75 from_previous_tick.unwrap_or(initial)
76 }
77}
78
79impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
80where
81 L: Location<'a>,
82{
83 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
84 assert_eq!(
85 Location::id(&self.location),
86 expected_location,
87 "locations do not match"
88 );
89 self.location
90 .flow_state()
91 .borrow_mut()
92 .push_root(HydroRoot::CycleSink {
93 ident,
94 input: Box::new(self.ir_node.into_inner()),
95 op_metadata: HydroIrOpMetadata::new(),
96 });
97 }
98}
99
100impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
101where
102 L: Location<'a>,
103{
104 type Location = Tick<L>;
105
106 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
107 Singleton::new(
108 location.clone(),
109 HydroNode::CycleSource {
110 ident,
111 metadata: location.new_node_metadata(Self::collection_kind()),
112 },
113 )
114 }
115}
116
117impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
118where
119 L: Location<'a>,
120{
121 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
122 assert_eq!(
123 Location::id(&self.location),
124 expected_location,
125 "locations do not match"
126 );
127 self.location
128 .flow_state()
129 .borrow_mut()
130 .push_root(HydroRoot::CycleSink {
131 ident,
132 input: Box::new(self.ir_node.into_inner()),
133 op_metadata: HydroIrOpMetadata::new(),
134 });
135 }
136}
137
138impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
139where
140 L: Location<'a> + NoTick,
141{
142 type Location = L;
143
144 fn create_source(ident: syn::Ident, location: L) -> Self {
145 Singleton::new(
146 location.clone(),
147 HydroNode::CycleSource {
148 ident,
149 metadata: location.new_node_metadata(Self::collection_kind()),
150 },
151 )
152 }
153}
154
155impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
156where
157 L: Location<'a> + NoTick,
158{
159 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160 assert_eq!(
161 Location::id(&self.location),
162 expected_location,
163 "locations do not match"
164 );
165 self.location
166 .flow_state()
167 .borrow_mut()
168 .push_root(HydroRoot::CycleSink {
169 ident,
170 input: Box::new(self.ir_node.into_inner()),
171 op_metadata: HydroIrOpMetadata::new(),
172 });
173 }
174}
175
176impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
177where
178 T: Clone,
179 L: Location<'a>,
180{
181 fn clone(&self) -> Self {
182 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
183 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
184 *self.ir_node.borrow_mut() = HydroNode::Tee {
185 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
186 metadata: self.location.new_node_metadata(Self::collection_kind()),
187 };
188 }
189
190 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
191 Singleton {
192 location: self.location.clone(),
193 ir_node: HydroNode::Tee {
194 inner: TeeNode(inner.0.clone()),
195 metadata: metadata.clone(),
196 }
197 .into(),
198 _phantom: PhantomData,
199 }
200 } else {
201 unreachable!()
202 }
203 }
204}
205
206#[cfg(stageleft_runtime)]
207fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
208 me: Singleton<T, Tick<L>, B>,
209 other: O,
210) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
211where
212 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
213{
214 check_matching_location(
215 &me.location,
216 &Singleton::<T, Tick<L>, B>::other_location(&other),
217 );
218
219 Singleton::<T, Tick<L>, B>::make(
220 me.location.clone(),
221 HydroNode::CrossSingleton {
222 left: Box::new(me.ir_node.into_inner()),
223 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
224 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
225 bound: B::BOUND_KIND,
226 element_type: stageleft::quote_type::<
227 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
228 >()
229 .into(),
230 }),
231 },
232 )
233}
234
235impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
236where
237 L: Location<'a>,
238{
239 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
240 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
241 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
242 Singleton {
243 location,
244 ir_node: RefCell::new(ir_node),
245 _phantom: PhantomData,
246 }
247 }
248
249 pub(crate) fn collection_kind() -> CollectionKind {
250 CollectionKind::Singleton {
251 bound: B::BOUND_KIND,
252 element_type: stageleft::quote_type::<T>().into(),
253 }
254 }
255
256 /// Returns the [`Location`] where this singleton is being materialized.
257 pub fn location(&self) -> &L {
258 &self.location
259 }
260
261 /// Transforms the singleton value by applying a function `f` to it,
262 /// continuously as the input is updated.
263 ///
264 /// # Example
265 /// ```rust
266 /// # #[cfg(feature = "deploy")] {
267 /// # use hydro_lang::prelude::*;
268 /// # use futures::StreamExt;
269 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
270 /// let tick = process.tick();
271 /// let singleton = tick.singleton(q!(5));
272 /// singleton.map(q!(|v| v * 2)).all_ticks()
273 /// # }, |mut stream| async move {
274 /// // 10
275 /// # assert_eq!(stream.next().await.unwrap(), 10);
276 /// # }));
277 /// # }
278 /// ```
279 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
280 where
281 F: Fn(T) -> U + 'a,
282 {
283 let f = f.splice_fn1_ctx(&self.location).into();
284 Singleton::new(
285 self.location.clone(),
286 HydroNode::Map {
287 f,
288 input: Box::new(self.ir_node.into_inner()),
289 metadata: self
290 .location
291 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
292 },
293 )
294 }
295
296 /// Transforms the singleton value by applying a function `f` to it and then flattening
297 /// the result into a stream, preserving the order of elements.
298 ///
299 /// The function `f` is applied to the singleton value to produce an iterator, and all items
300 /// from that iterator are emitted in the output stream in deterministic order.
301 ///
302 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
303 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
304 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
305 ///
306 /// # Example
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312 /// let tick = process.tick();
313 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
314 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
315 /// # }, |mut stream| async move {
316 /// // 1, 2, 3
317 /// # for w in vec![1, 2, 3] {
318 /// # assert_eq!(stream.next().await.unwrap(), w);
319 /// # }
320 /// # }));
321 /// # }
322 /// ```
323 pub fn flat_map_ordered<U, I, F>(
324 self,
325 f: impl IntoQuotedMut<'a, F, L>,
326 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
327 where
328 I: IntoIterator<Item = U>,
329 F: Fn(T) -> I + 'a,
330 {
331 let f = f.splice_fn1_ctx(&self.location).into();
332 Stream::new(
333 self.location.clone(),
334 HydroNode::FlatMap {
335 f,
336 input: Box::new(self.ir_node.into_inner()),
337 metadata: self.location.new_node_metadata(
338 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
339 ),
340 },
341 )
342 }
343
344 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
345 /// for the output type `I` to produce items in any order.
346 ///
347 /// The function `f` is applied to the singleton value to produce an iterator, and all items
348 /// from that iterator are emitted in the output stream in non-deterministic order.
349 ///
350 /// # Example
351 /// ```rust
352 /// # #[cfg(feature = "deploy")] {
353 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
354 /// # use futures::StreamExt;
355 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
356 /// let tick = process.tick();
357 /// let singleton = tick.singleton(q!(
358 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
359 /// ));
360 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
361 /// # }, |mut stream| async move {
362 /// // 1, 2, 3, but in no particular order
363 /// # let mut results = Vec::new();
364 /// # for _ in 0..3 {
365 /// # results.push(stream.next().await.unwrap());
366 /// # }
367 /// # results.sort();
368 /// # assert_eq!(results, vec![1, 2, 3]);
369 /// # }));
370 /// # }
371 /// ```
372 pub fn flat_map_unordered<U, I, F>(
373 self,
374 f: impl IntoQuotedMut<'a, F, L>,
375 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
376 where
377 I: IntoIterator<Item = U>,
378 F: Fn(T) -> I + 'a,
379 {
380 let f = f.splice_fn1_ctx(&self.location).into();
381 Stream::new(
382 self.location.clone(),
383 HydroNode::FlatMap {
384 f,
385 input: Box::new(self.ir_node.into_inner()),
386 metadata: self
387 .location
388 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
389 },
390 )
391 }
392
393 /// Flattens the singleton value into a stream, preserving the order of elements.
394 ///
395 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
396 /// are emitted in the output stream in deterministic order.
397 ///
398 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
399 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
400 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
401 ///
402 /// # Example
403 /// ```rust
404 /// # #[cfg(feature = "deploy")] {
405 /// # use hydro_lang::prelude::*;
406 /// # use futures::StreamExt;
407 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
408 /// let tick = process.tick();
409 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
410 /// singleton.flatten_ordered().all_ticks()
411 /// # }, |mut stream| async move {
412 /// // 1, 2, 3
413 /// # for w in vec![1, 2, 3] {
414 /// # assert_eq!(stream.next().await.unwrap(), w);
415 /// # }
416 /// # }));
417 /// # }
418 /// ```
419 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
420 where
421 T: IntoIterator<Item = U>,
422 {
423 self.flat_map_ordered(q!(|x| x))
424 }
425
426 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
427 /// for the element type `T` to produce items in any order.
428 ///
429 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
430 /// are emitted in the output stream in non-deterministic order.
431 ///
432 /// # Example
433 /// ```rust
434 /// # #[cfg(feature = "deploy")] {
435 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
436 /// # use futures::StreamExt;
437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
438 /// let tick = process.tick();
439 /// let singleton = tick.singleton(q!(
440 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
441 /// ));
442 /// singleton.flatten_unordered().all_ticks()
443 /// # }, |mut stream| async move {
444 /// // 1, 2, 3, but in no particular order
445 /// # let mut results = Vec::new();
446 /// # for _ in 0..3 {
447 /// # results.push(stream.next().await.unwrap());
448 /// # }
449 /// # results.sort();
450 /// # assert_eq!(results, vec![1, 2, 3]);
451 /// # }));
452 /// # }
453 /// ```
454 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
455 where
456 T: IntoIterator<Item = U>,
457 {
458 self.flat_map_unordered(q!(|x| x))
459 }
460
461 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
462 ///
463 /// If the predicate returns `true`, the output optional contains the same value.
464 /// If the predicate returns `false`, the output optional is empty.
465 ///
466 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
467 /// not modify or take ownership of the value. If you need to modify the value while filtering
468 /// use [`Singleton::filter_map`] instead.
469 ///
470 /// # Example
471 /// ```rust
472 /// # #[cfg(feature = "deploy")] {
473 /// # use hydro_lang::prelude::*;
474 /// # use futures::StreamExt;
475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
476 /// let tick = process.tick();
477 /// let singleton = tick.singleton(q!(5));
478 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
479 /// # }, |mut stream| async move {
480 /// // 5
481 /// # assert_eq!(stream.next().await.unwrap(), 5);
482 /// # }));
483 /// # }
484 /// ```
485 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
486 where
487 F: Fn(&T) -> bool + 'a,
488 {
489 let f = f.splice_fn1_borrow_ctx(&self.location).into();
490 Optional::new(
491 self.location.clone(),
492 HydroNode::Filter {
493 f,
494 input: Box::new(self.ir_node.into_inner()),
495 metadata: self
496 .location
497 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
498 },
499 )
500 }
501
502 /// An operator that both filters and maps. It yields the value only if the supplied
503 /// closure `f` returns `Some(value)`.
504 ///
505 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
506 /// If the closure returns `None`, the output optional is empty.
507 ///
508 /// # Example
509 /// ```rust
510 /// # #[cfg(feature = "deploy")] {
511 /// # use hydro_lang::prelude::*;
512 /// # use futures::StreamExt;
513 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
514 /// let tick = process.tick();
515 /// let singleton = tick.singleton(q!("42"));
516 /// singleton
517 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
518 /// .all_ticks()
519 /// # }, |mut stream| async move {
520 /// // 42
521 /// # assert_eq!(stream.next().await.unwrap(), 42);
522 /// # }));
523 /// # }
524 /// ```
525 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
526 where
527 F: Fn(T) -> Option<U> + 'a,
528 {
529 let f = f.splice_fn1_ctx(&self.location).into();
530 Optional::new(
531 self.location.clone(),
532 HydroNode::FilterMap {
533 f,
534 input: Box::new(self.ir_node.into_inner()),
535 metadata: self
536 .location
537 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
538 },
539 )
540 }
541
542 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
543 ///
544 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
545 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
546 /// non-null. This is useful for combining several pieces of state together.
547 ///
548 /// # Example
549 /// ```rust
550 /// # #[cfg(feature = "deploy")] {
551 /// # use hydro_lang::prelude::*;
552 /// # use futures::StreamExt;
553 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
554 /// let tick = process.tick();
555 /// let numbers = process
556 /// .source_iter(q!(vec![123, 456]))
557 /// .batch(&tick, nondet!(/** test */));
558 /// let count = numbers.clone().count(); // Singleton
559 /// let max = numbers.max(); // Optional
560 /// count.zip(max).all_ticks()
561 /// # }, |mut stream| async move {
562 /// // [(2, 456)]
563 /// # for w in vec![(2, 456)] {
564 /// # assert_eq!(stream.next().await.unwrap(), w);
565 /// # }
566 /// # }));
567 /// # }
568 /// ```
569 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
570 where
571 Self: ZipResult<'a, O, Location = L>,
572 {
573 check_matching_location(&self.location, &Self::other_location(&other));
574
575 if L::is_top_level()
576 && let Some(tick) = self.location.try_tick()
577 {
578 let out = zip_inside_tick(
579 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
580 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
581 Self::other_location(&other),
582 Self::other_ir_node(other),
583 )
584 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
585 )
586 .latest();
587
588 Self::make(out.location, out.ir_node.into_inner())
589 } else {
590 Self::make(
591 self.location.clone(),
592 HydroNode::CrossSingleton {
593 left: Box::new(self.ir_node.into_inner()),
594 right: Box::new(Self::other_ir_node(other)),
595 metadata: self.location.new_node_metadata(CollectionKind::Optional {
596 bound: B::BOUND_KIND,
597 element_type: stageleft::quote_type::<
598 <Self as ZipResult<'a, O>>::ElementType,
599 >()
600 .into(),
601 }),
602 },
603 )
604 }
605 }
606
607 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
608 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
609 ///
610 /// Useful for conditionally processing, such as only emitting a singleton's value outside
611 /// a tick if some other condition is satisfied.
612 ///
613 /// # Example
614 /// ```rust
615 /// # #[cfg(feature = "deploy")] {
616 /// # use hydro_lang::prelude::*;
617 /// # use futures::StreamExt;
618 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619 /// let tick = process.tick();
620 /// // ticks are lazy by default, forces the second tick to run
621 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
622 ///
623 /// let batch_first_tick = process
624 /// .source_iter(q!(vec![1]))
625 /// .batch(&tick, nondet!(/** test */));
626 /// let batch_second_tick = process
627 /// .source_iter(q!(vec![1, 2, 3]))
628 /// .batch(&tick, nondet!(/** test */))
629 /// .defer_tick(); // appears on the second tick
630 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
631 /// batch_first_tick.chain(batch_second_tick).count()
632 /// .filter_if_some(some_on_first_tick)
633 /// .all_ticks()
634 /// # }, |mut stream| async move {
635 /// // [1]
636 /// # for w in vec![1] {
637 /// # assert_eq!(stream.next().await.unwrap(), w);
638 /// # }
639 /// # }));
640 /// # }
641 /// ```
642 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
643 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
644 .map(q!(|(d, _signal)| d))
645 }
646
647 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
648 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
649 ///
650 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
651 /// the condition.
652 ///
653 /// # Example
654 /// ```rust
655 /// # #[cfg(feature = "deploy")] {
656 /// # use hydro_lang::prelude::*;
657 /// # use futures::StreamExt;
658 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659 /// let tick = process.tick();
660 /// // ticks are lazy by default, forces the second tick to run
661 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
662 ///
663 /// let batch_first_tick = process
664 /// .source_iter(q!(vec![1]))
665 /// .batch(&tick, nondet!(/** test */));
666 /// let batch_second_tick = process
667 /// .source_iter(q!(vec![1, 2, 3]))
668 /// .batch(&tick, nondet!(/** test */))
669 /// .defer_tick(); // appears on the second tick
670 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
671 /// batch_first_tick.chain(batch_second_tick).count()
672 /// .filter_if_none(some_on_first_tick)
673 /// .all_ticks()
674 /// # }, |mut stream| async move {
675 /// // [3]
676 /// # for w in vec![3] {
677 /// # assert_eq!(stream.next().await.unwrap(), w);
678 /// # }
679 /// # }));
680 /// # }
681 /// ```
682 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
683 self.filter_if_some(
684 other
685 .map(q!(|_| ()))
686 .into_singleton()
687 .filter(q!(|o| o.is_none())),
688 )
689 }
690
691 /// An operator which allows you to "name" a `HydroNode`.
692 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
693 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
694 {
695 let mut node = self.ir_node.borrow_mut();
696 let metadata = node.metadata_mut();
697 metadata.tag = Some(name.to_string());
698 }
699 self
700 }
701}
702
703impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
704where
705 L: Location<'a> + NoTick,
706{
707 /// Returns a singleton value corresponding to the latest snapshot of the singleton
708 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
709 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
710 /// all snapshots of this singleton into the atomic-associated tick will observe the
711 /// same value each tick.
712 ///
713 /// # Non-Determinism
714 /// Because this picks a snapshot of a singleton whose value is continuously changing,
715 /// the output singleton has a non-deterministic value since the snapshot can be at an
716 /// arbitrary point in time.
717 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
718 Singleton::new(
719 self.location.clone().tick,
720 HydroNode::Batch {
721 inner: Box::new(self.ir_node.into_inner()),
722 metadata: self
723 .location
724 .tick
725 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
726 },
727 )
728 }
729
730 /// Returns this singleton back into a top-level, asynchronous execution context where updates
731 /// to the value will be asynchronously propagated.
732 pub fn end_atomic(self) -> Singleton<T, L, B> {
733 Singleton::new(
734 self.location.tick.l.clone(),
735 HydroNode::EndAtomic {
736 inner: Box::new(self.ir_node.into_inner()),
737 metadata: self
738 .location
739 .tick
740 .l
741 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
742 },
743 )
744 }
745}
746
747impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
748where
749 L: Location<'a>,
750{
751 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
752 /// will observe the same version of the value and will be executed synchronously before any
753 /// outputs are yielded (in [`Optional::end_atomic`]).
754 ///
755 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
756 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
757 /// a different version).
758 ///
759 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
760 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
761 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
762 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
763 let out_location = Atomic { tick: tick.clone() };
764 Singleton::new(
765 out_location.clone(),
766 HydroNode::BeginAtomic {
767 inner: Box::new(self.ir_node.into_inner()),
768 metadata: out_location
769 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
770 },
771 )
772 }
773
774 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
775 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
776 /// relevant data that contributed to the snapshot at tick `t`.
777 ///
778 /// # Non-Determinism
779 /// Because this picks a snapshot of a singleton whose value is continuously changing,
780 /// the output singleton has a non-deterministic value since the snapshot can be at an
781 /// arbitrary point in time.
782 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
783 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
784 Singleton::new(
785 tick.clone(),
786 HydroNode::Batch {
787 inner: Box::new(self.ir_node.into_inner()),
788 metadata: tick
789 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
790 },
791 )
792 }
793
794 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
795 /// with order corresponding to increasing prefixes of data contributing to the singleton.
796 ///
797 /// # Non-Determinism
798 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
799 /// to non-deterministic batching and arrival of inputs, the output stream is
800 /// non-deterministic.
801 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
802 where
803 L: NoTick,
804 {
805 sliced! {
806 let snapshot = use(self, nondet);
807 snapshot.into_stream()
808 }
809 .weakest_retries()
810 }
811
812 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
813 /// value taken at various points in time. Because the input singleton may be
814 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
815 /// represent the value of the singleton given some prefix of the streams leading up to
816 /// it.
817 ///
818 /// # Non-Determinism
819 /// The output stream is non-deterministic in which elements are sampled, since this
820 /// is controlled by a clock.
821 pub fn sample_every(
822 self,
823 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
824 nondet: NonDet,
825 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
826 where
827 L: NoTick + NoAtomic,
828 {
829 let samples = self.location.source_interval(interval, nondet);
830 sliced! {
831 let snapshot = use(self, nondet);
832 let sample_batch = use(samples, nondet);
833
834 snapshot.filter_if_some(sample_batch.first()).into_stream()
835 }
836 .weakest_retries()
837 }
838}
839
840impl<'a, T, L> Singleton<T, L, Bounded>
841where
842 L: Location<'a>,
843{
844 /// Clones this bounded singleton into a tick, returning a singleton that has the
845 /// same value as the outer singleton. Because the outer singleton is bounded, this
846 /// is deterministic because there is only a single immutable version.
847 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
848 where
849 T: Clone,
850 {
851 // TODO(shadaj): avoid printing simulator logs for this snapshot
852 self.snapshot(
853 tick,
854 nondet!(/** bounded top-level singleton so deterministic */),
855 )
856 }
857
858 /// Converts this singleton into a [`Stream`] containing a single element, the value.
859 ///
860 /// # Example
861 /// ```rust
862 /// # #[cfg(feature = "deploy")] {
863 /// # use hydro_lang::prelude::*;
864 /// # use futures::StreamExt;
865 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
866 /// let tick = process.tick();
867 /// let batch_input = process
868 /// .source_iter(q!(vec![123, 456]))
869 /// .batch(&tick, nondet!(/** test */));
870 /// batch_input.clone().chain(
871 /// batch_input.count().into_stream()
872 /// ).all_ticks()
873 /// # }, |mut stream| async move {
874 /// // [123, 456, 2]
875 /// # for w in vec![123, 456, 2] {
876 /// # assert_eq!(stream.next().await.unwrap(), w);
877 /// # }
878 /// # }));
879 /// # }
880 /// ```
881 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
882 Stream::new(
883 self.location.clone(),
884 HydroNode::Cast {
885 inner: Box::new(self.ir_node.into_inner()),
886 metadata: self.location.new_node_metadata(Stream::<
887 T,
888 Tick<L>,
889 Bounded,
890 TotalOrder,
891 ExactlyOnce,
892 >::collection_kind()),
893 },
894 )
895 }
896}
897
898impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
899where
900 L: Location<'a>,
901{
902 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
903 /// which will stream the value computed in _each_ tick as a separate stream element.
904 ///
905 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
906 /// producing one element in the output for each tick. This is useful for batched computations,
907 /// where the results from each tick must be combined together.
908 ///
909 /// # Example
910 /// ```rust
911 /// # #[cfg(feature = "deploy")] {
912 /// # use hydro_lang::prelude::*;
913 /// # use futures::StreamExt;
914 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
915 /// let tick = process.tick();
916 /// # // ticks are lazy by default, forces the second tick to run
917 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
918 /// # let batch_first_tick = process
919 /// # .source_iter(q!(vec![1]))
920 /// # .batch(&tick, nondet!(/** test */));
921 /// # let batch_second_tick = process
922 /// # .source_iter(q!(vec![1, 2, 3]))
923 /// # .batch(&tick, nondet!(/** test */))
924 /// # .defer_tick(); // appears on the second tick
925 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
926 /// input_batch // first tick: [1], second tick: [1, 2, 3]
927 /// .count()
928 /// .all_ticks()
929 /// # }, |mut stream| async move {
930 /// // [1, 3]
931 /// # for w in vec![1, 3] {
932 /// # assert_eq!(stream.next().await.unwrap(), w);
933 /// # }
934 /// # }));
935 /// # }
936 /// ```
937 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
938 self.into_stream().all_ticks()
939 }
940
941 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
942 /// which will stream the value computed in _each_ tick as a separate stream element.
943 ///
944 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
945 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
946 /// singleton's [`Tick`] context.
947 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
948 self.into_stream().all_ticks_atomic()
949 }
950
951 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
952 /// be asynchronously updated with the latest value of the singleton inside the tick.
953 ///
954 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
955 /// tick that tracks the inner value. This is useful for getting the value as of the
956 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
957 ///
958 /// # Example
959 /// ```rust
960 /// # #[cfg(feature = "deploy")] {
961 /// # use hydro_lang::prelude::*;
962 /// # use futures::StreamExt;
963 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964 /// let tick = process.tick();
965 /// # // ticks are lazy by default, forces the second tick to run
966 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
967 /// # let batch_first_tick = process
968 /// # .source_iter(q!(vec![1]))
969 /// # .batch(&tick, nondet!(/** test */));
970 /// # let batch_second_tick = process
971 /// # .source_iter(q!(vec![1, 2, 3]))
972 /// # .batch(&tick, nondet!(/** test */))
973 /// # .defer_tick(); // appears on the second tick
974 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
975 /// input_batch // first tick: [1], second tick: [1, 2, 3]
976 /// .count()
977 /// .latest()
978 /// # .sample_eager(nondet!(/** test */))
979 /// # }, |mut stream| async move {
980 /// // asynchronously changes from 1 ~> 3
981 /// # for w in vec![1, 3] {
982 /// # assert_eq!(stream.next().await.unwrap(), w);
983 /// # }
984 /// # }));
985 /// # }
986 /// ```
987 pub fn latest(self) -> Singleton<T, L, Unbounded> {
988 Singleton::new(
989 self.location.outer().clone(),
990 HydroNode::YieldConcat {
991 inner: Box::new(self.ir_node.into_inner()),
992 metadata: self
993 .location
994 .outer()
995 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
996 },
997 )
998 }
999
1000 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1001 /// be updated with the latest value of the singleton inside the tick.
1002 ///
1003 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1004 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1005 /// singleton's [`Tick`] context.
1006 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1007 let out_location = Atomic {
1008 tick: self.location.clone(),
1009 };
1010 Singleton::new(
1011 out_location.clone(),
1012 HydroNode::YieldConcat {
1013 inner: Box::new(self.ir_node.into_inner()),
1014 metadata: out_location
1015 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1016 },
1017 )
1018 }
1019}
1020
1021#[doc(hidden)]
1022/// Helper trait that determines the output collection type for [`Singleton::zip`].
1023///
1024/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1025/// [`Singleton`].
1026#[sealed::sealed]
1027pub trait ZipResult<'a, Other> {
1028 /// The output collection type.
1029 type Out;
1030 /// The type of the tupled output value.
1031 type ElementType;
1032 /// The type of the other collection's value.
1033 type OtherType;
1034 /// The location where the tupled result will be materialized.
1035 type Location: Location<'a>;
1036
1037 /// The location of the second input to the `zip`.
1038 fn other_location(other: &Other) -> Self::Location;
1039 /// The IR node of the second input to the `zip`.
1040 fn other_ir_node(other: Other) -> HydroNode;
1041
1042 /// Constructs the output live collection given an IR node containing the zip result.
1043 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1044}
1045
1046#[sealed::sealed]
1047impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1048where
1049 L: Location<'a>,
1050{
1051 type Out = Singleton<(T, U), L, B>;
1052 type ElementType = (T, U);
1053 type OtherType = U;
1054 type Location = L;
1055
1056 fn other_location(other: &Singleton<U, L, B>) -> L {
1057 other.location.clone()
1058 }
1059
1060 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1061 other.ir_node.into_inner()
1062 }
1063
1064 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1065 Singleton::new(
1066 location.clone(),
1067 HydroNode::Cast {
1068 inner: Box::new(ir_node),
1069 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1070 },
1071 )
1072 }
1073}
1074
1075#[sealed::sealed]
1076impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1077where
1078 L: Location<'a>,
1079{
1080 type Out = Optional<(T, U), L, B>;
1081 type ElementType = (T, U);
1082 type OtherType = U;
1083 type Location = L;
1084
1085 fn other_location(other: &Optional<U, L, B>) -> L {
1086 other.location.clone()
1087 }
1088
1089 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1090 other.ir_node.into_inner()
1091 }
1092
1093 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1094 Optional::new(location, ir_node)
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 #[cfg(feature = "deploy")]
1101 use futures::{SinkExt, StreamExt};
1102 #[cfg(feature = "deploy")]
1103 use hydro_deploy::Deployment;
1104 #[cfg(any(feature = "deploy", feature = "sim"))]
1105 use stageleft::q;
1106
1107 #[cfg(any(feature = "deploy", feature = "sim"))]
1108 use crate::compile::builder::FlowBuilder;
1109 #[cfg(feature = "deploy")]
1110 use crate::live_collections::stream::ExactlyOnce;
1111 #[cfg(any(feature = "deploy", feature = "sim"))]
1112 use crate::location::Location;
1113 #[cfg(any(feature = "deploy", feature = "sim"))]
1114 use crate::nondet::nondet;
1115
1116 #[cfg(feature = "deploy")]
1117 #[tokio::test]
1118 async fn tick_cycle_cardinality() {
1119 let mut deployment = Deployment::new();
1120
1121 let flow = FlowBuilder::new();
1122 let node = flow.process::<()>();
1123 let external = flow.external::<()>();
1124
1125 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1126
1127 let node_tick = node.tick();
1128 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1129 let counts = singleton
1130 .clone()
1131 .into_stream()
1132 .count()
1133 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1134 .all_ticks()
1135 .send_bincode_external(&external);
1136 complete_cycle.complete_next_tick(singleton);
1137
1138 let nodes = flow
1139 .with_process(&node, deployment.Localhost())
1140 .with_external(&external, deployment.Localhost())
1141 .deploy(&mut deployment);
1142
1143 deployment.deploy().await.unwrap();
1144
1145 let mut tick_trigger = nodes.connect(input_send).await;
1146 let mut external_out = nodes.connect(counts).await;
1147
1148 deployment.start().await.unwrap();
1149
1150 tick_trigger.send(()).await.unwrap();
1151
1152 assert_eq!(external_out.next().await.unwrap(), 1);
1153
1154 tick_trigger.send(()).await.unwrap();
1155
1156 assert_eq!(external_out.next().await.unwrap(), 1);
1157 }
1158
1159 #[cfg(feature = "sim")]
1160 #[test]
1161 #[should_panic]
1162 fn sim_fold_intermediate_states() {
1163 let flow = FlowBuilder::new();
1164 let node = flow.process::<()>();
1165
1166 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1167 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1168
1169 let tick = node.tick();
1170 let batch = folded.snapshot(&tick, nondet!(/** test */));
1171 let out_recv = batch.all_ticks().sim_output();
1172
1173 flow.sim().exhaustive(async || {
1174 assert_eq!(out_recv.next().await.unwrap(), 10);
1175 });
1176 }
1177
1178 #[cfg(feature = "sim")]
1179 #[test]
1180 fn sim_fold_intermediate_state_count() {
1181 let flow = FlowBuilder::new();
1182 let node = flow.process::<()>();
1183
1184 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1185 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1186
1187 let tick = node.tick();
1188 let batch = folded.snapshot(&tick, nondet!(/** test */));
1189 let out_recv = batch.all_ticks().sim_output();
1190
1191 let instance_count = flow.sim().exhaustive(async || {
1192 let out = out_recv.collect::<Vec<_>>().await;
1193 assert_eq!(out.last(), Some(&10));
1194 });
1195
1196 assert_eq!(
1197 instance_count,
1198 16 // 2^4 possible subsets of intermediates (including initial state)
1199 )
1200 }
1201
1202 #[cfg(feature = "sim")]
1203 #[test]
1204 fn sim_fold_no_repeat_initial() {
1205 // check that we don't repeat the initial state of the fold in autonomous decisions
1206
1207 let flow = FlowBuilder::new();
1208 let node = flow.process::<()>();
1209
1210 let (in_port, input) = node.sim_input();
1211 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1212
1213 let tick = node.tick();
1214 let batch = folded.snapshot(&tick, nondet!(/** test */));
1215 let out_recv = batch.all_ticks().sim_output();
1216
1217 flow.sim().exhaustive(async || {
1218 assert_eq!(out_recv.next().await.unwrap(), 0);
1219
1220 in_port.send(123);
1221
1222 assert_eq!(out_recv.next().await.unwrap(), 123);
1223 });
1224 }
1225
1226 #[cfg(feature = "sim")]
1227 #[test]
1228 #[should_panic]
1229 fn sim_fold_repeats_snapshots() {
1230 // when the tick is driven by a snapshot AND something else, the snapshot can
1231 // "stutter" and repeat the same state multiple times
1232
1233 let flow = FlowBuilder::new();
1234 let node = flow.process::<()>();
1235
1236 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1237 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1238
1239 let tick = node.tick();
1240 let batch = source
1241 .batch(&tick, nondet!(/** test */))
1242 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1243 let out_recv = batch.all_ticks().sim_output();
1244
1245 flow.sim().exhaustive(async || {
1246 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1247 {
1248 panic!("repeated snapshot");
1249 }
1250 });
1251 }
1252
1253 #[cfg(feature = "sim")]
1254 #[test]
1255 fn sim_fold_repeats_snapshots_count() {
1256 // check the number of instances
1257 let flow = FlowBuilder::new();
1258 let node = flow.process::<()>();
1259
1260 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1261 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1262
1263 let tick = node.tick();
1264 let batch = source
1265 .batch(&tick, nondet!(/** test */))
1266 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1267 let out_recv = batch.all_ticks().sim_output();
1268
1269 let count = flow.sim().exhaustive(async || {
1270 let _ = out_recv.collect::<Vec<_>>().await;
1271 });
1272
1273 assert_eq!(count, 52);
1274 // don't have a combinatorial explanation for this number yet, but checked via logs
1275 }
1276
1277 #[cfg(feature = "sim")]
1278 #[test]
1279 fn sim_top_level_singleton_exhaustive() {
1280 // ensures that top-level singletons have only one snapshot
1281 let flow = FlowBuilder::new();
1282 let node = flow.process::<()>();
1283
1284 let singleton = node.singleton(q!(1));
1285 let tick = node.tick();
1286 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1287 let out_recv = batch.all_ticks().sim_output();
1288
1289 let count = flow.sim().exhaustive(async || {
1290 let _ = out_recv.collect::<Vec<_>>().await;
1291 });
1292
1293 assert_eq!(count, 1);
1294 }
1295
1296 #[cfg(feature = "sim")]
1297 #[test]
1298 fn sim_top_level_singleton_join_count() {
1299 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1300 // exploration
1301
1302 let flow = FlowBuilder::new();
1303 let node = flow.process::<()>();
1304
1305 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1306 let tick = node.tick();
1307 let batch = source_iter
1308 .batch(&tick, nondet!(/** test */))
1309 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1310 let out_recv = batch.all_ticks().sim_output();
1311
1312 let instance_count = flow.sim().exhaustive(async || {
1313 let _ = out_recv.collect::<Vec<_>>().await;
1314 });
1315
1316 assert_eq!(
1317 instance_count,
1318 16 // 2^4 ways to split up (including a possibly empty first batch)
1319 )
1320 }
1321
1322 #[cfg(feature = "sim")]
1323 #[test]
1324 fn top_level_singleton_into_stream_no_replay() {
1325 let flow = FlowBuilder::new();
1326 let node = flow.process::<()>();
1327
1328 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1329 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1330
1331 let out_recv = folded.into_stream().sim_output();
1332
1333 flow.sim().exhaustive(async || {
1334 out_recv.assert_yields_only([10]).await;
1335 });
1336 }
1337}