hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
29 StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor,
30};
31
32/// A marker trait indicating which components of a [`Singleton`] may change.
33///
34/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
35/// includes an additional variant [`Monotonic`], which means that the value will only grow.
36pub trait SingletonBound {
37 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
38 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
39
40 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
41 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
42
43 /// Returns the [`SingletonBoundKind`] corresponding to this type.
44 fn bound_kind() -> SingletonBoundKind;
45}
46
47impl SingletonBound for Unbounded {
48 type UnderlyingBound = Unbounded;
49
50 type StreamToMonotone = Monotonic;
51
52 fn bound_kind() -> SingletonBoundKind {
53 SingletonBoundKind::Unbounded
54 }
55}
56
57impl SingletonBound for Bounded {
58 type UnderlyingBound = Bounded;
59
60 type StreamToMonotone = Bounded;
61
62 fn bound_kind() -> SingletonBoundKind {
63 SingletonBoundKind::Bounded
64 }
65}
66
67/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
68pub struct Monotonic;
69
70impl SingletonBound for Monotonic {
71 type UnderlyingBound = Unbounded;
72
73 type StreamToMonotone = Monotonic;
74
75 fn bound_kind() -> SingletonBoundKind {
76 SingletonBoundKind::Monotonic
77 }
78}
79
80#[sealed]
81#[diagnostic::on_unimplemented(
82 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
83 label = "required here",
84 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
85)]
86/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
87pub trait IsMonotonic: SingletonBound {}
88
89#[sealed]
90#[diagnostic::do_not_recommend]
91impl IsMonotonic for Monotonic {}
92
93#[sealed]
94#[diagnostic::do_not_recommend]
95impl<B: IsBounded> IsMonotonic for B {}
96
97/// A single Rust value that can asynchronously change over time.
98///
99/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
100/// [`Unbounded`], the value will asynchronously change over time.
101///
102/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
103/// a single number that will asynchronously change as events are processed. Singletons also appear
104/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
105/// such as getting the length of a batch of requests.
106///
107/// Type Parameters:
108/// - `Type`: the type of the value in this singleton
109/// - `Loc`: the [`Location`] where the singleton is materialized
110/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
111pub struct Singleton<Type, Loc, Bound: SingletonBound> {
112 pub(crate) location: Loc,
113 pub(crate) ir_node: RefCell<HydroNode>,
114 pub(crate) flow_state: FlowState,
115
116 _phantom: PhantomData<(Type, Loc, Bound)>,
117}
118
119impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
120 fn drop(&mut self) {
121 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
122 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
123 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
124 input: Box::new(ir_node),
125 op_metadata: HydroIrOpMetadata::new(),
126 });
127 }
128 }
129}
130
131impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
132where
133 T: Clone,
134 L: Location<'a>,
135{
136 fn from(value: Singleton<T, L, Bounded>) -> Self {
137 let location = value.location().clone();
138 Singleton::new(
139 location.clone(),
140 HydroNode::UnboundSingleton {
141 inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
142 metadata: location
143 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
144 },
145 )
146 }
147}
148
149impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
150where
151 L: Location<'a>,
152{
153 type Location = Tick<L>;
154
155 fn location(&self) -> &Self::Location {
156 self.location()
157 }
158
159 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
160 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
161 location.clone(),
162 HydroNode::DeferTick {
163 input: Box::new(HydroNode::CycleSource {
164 cycle_id,
165 metadata: location.new_node_metadata(Self::collection_kind()),
166 }),
167 metadata: location
168 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
169 },
170 );
171
172 from_previous_tick.unwrap_or(initial)
173 }
174}
175
176impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
177where
178 L: Location<'a>,
179{
180 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
181 assert_eq!(
182 Location::id(&self.location),
183 expected_location,
184 "locations do not match"
185 );
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::CycleSink {
190 cycle_id,
191 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
192 op_metadata: HydroIrOpMetadata::new(),
193 });
194 }
195}
196
197impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
198where
199 L: Location<'a>,
200{
201 type Location = L;
202
203 fn create_source(cycle_id: CycleId, location: L) -> Self {
204 Singleton::new(
205 location.clone(),
206 HydroNode::CycleSource {
207 cycle_id,
208 metadata: location.new_node_metadata(Self::collection_kind()),
209 },
210 )
211 }
212}
213
214impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
215where
216 L: Location<'a>,
217{
218 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
219 assert_eq!(
220 Location::id(&self.location),
221 expected_location,
222 "locations do not match"
223 );
224 self.location
225 .flow_state()
226 .borrow_mut()
227 .push_root(HydroRoot::CycleSink {
228 cycle_id,
229 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
230 op_metadata: HydroIrOpMetadata::new(),
231 });
232 }
233}
234
235impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
236where
237 T: Clone,
238 L: Location<'a>,
239{
240 fn clone(&self) -> Self {
241 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
242 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
243 *self.ir_node.borrow_mut() = HydroNode::Tee {
244 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
245 metadata: self.location.new_node_metadata(Self::collection_kind()),
246 };
247 }
248
249 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
250 Singleton {
251 location: self.location.clone(),
252 flow_state: self.flow_state.clone(),
253 ir_node: HydroNode::Tee {
254 inner: SharedNode(inner.0.clone()),
255 metadata: metadata.clone(),
256 }
257 .into(),
258 _phantom: PhantomData,
259 }
260 } else {
261 unreachable!()
262 }
263 }
264}
265
266#[cfg(stageleft_runtime)]
267fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
268 me: Singleton<T, Tick<L>, B>,
269 other: Optional<O, Tick<L>, B::UnderlyingBound>,
270) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
271 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
272 super::optional::zip_inside_tick(me_as_optional, other)
273}
274
275impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
276where
277 L: Location<'a>,
278{
279 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
280 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
281 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
282 let flow_state = location.flow_state().clone();
283 Singleton {
284 location,
285 flow_state,
286 ir_node: RefCell::new(ir_node),
287 _phantom: PhantomData,
288 }
289 }
290
291 pub(crate) fn collection_kind() -> CollectionKind {
292 CollectionKind::Singleton {
293 bound: B::bound_kind(),
294 element_type: stageleft::quote_type::<T>().into(),
295 }
296 }
297
298 /// Returns the [`Location`] where this singleton is being materialized.
299 pub fn location(&self) -> &L {
300 &self.location
301 }
302
303 /// Creates a lightweight reference handle to this singleton that can be captured
304 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
305 ///
306 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
307 ///
308 /// ```rust
309 /// # #[cfg(feature = "deploy")] {
310 /// # use hydro_lang::prelude::*;
311 /// # use futures::StreamExt;
312 /// # tokio_test::block_on(async {
313 /// # let mut deployment = hydro_deploy::Deployment::new();
314 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
315 /// # let process = builder.process::<()>();
316 /// # let external = builder.external::<()>();
317 /// let my_count = process
318 /// .source_iter(q!(0..5i32))
319 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
320 /// let count_ref = my_count.by_ref();
321 /// let out_port = process
322 /// .source_iter(q!(1..=3i32))
323 /// .map(q!(|x| x + *count_ref))
324 /// .send_bincode_external(&external);
325 /// # let nodes = builder
326 /// # .with_default_optimize()
327 /// # .with_process(&process, deployment.Localhost())
328 /// # .with_external(&external, deployment.Localhost())
329 /// # .deploy(&mut deployment);
330 /// # deployment.deploy().await.unwrap();
331 /// # let mut out_recv = nodes.connect(out_port).await;
332 /// # deployment.start().await.unwrap();
333 /// # let mut results = Vec::new();
334 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
335 /// # results.sort();
336 /// // fold(0..5) = 10, so results are 11, 12, 13
337 /// # assert_eq!(results, vec![11, 12, 13]);
338 /// # });
339 /// # }
340 /// ```
341 pub fn by_ref(&self) -> crate::handoff_ref::SingletonRef<'a, '_, T, L>
342 where
343 B: IsBounded,
344 {
345 crate::handoff_ref::SingletonRef::new(&self.ir_node)
346 }
347
348 /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
349 /// closures. The handle resolves to `&mut T` at runtime.
350 ///
351 /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
352 /// exclusive access at each point in the execution order.
353 ///
354 /// ```rust
355 /// # #[cfg(feature = "deploy")] {
356 /// # use hydro_lang::prelude::*;
357 /// # use futures::StreamExt;
358 /// # tokio_test::block_on(async {
359 /// # let mut deployment = hydro_deploy::Deployment::new();
360 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
361 /// # let process = builder.process::<()>();
362 /// # let external = builder.external::<()>();
363 /// let my_count = process
364 /// .source_iter(q!(0..5i32))
365 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
366 /// let count_mut = my_count.by_mut();
367 /// let out_port = process
368 /// .source_iter(q!(1..=3i32))
369 /// .map(q!(|x| {
370 /// *count_mut += x;
371 /// *count_mut
372 /// }))
373 /// .send_bincode_external(&external);
374 /// # let nodes = builder
375 /// # .with_default_optimize()
376 /// # .with_process(&process, deployment.Localhost())
377 /// # .with_external(&external, deployment.Localhost())
378 /// # .deploy(&mut deployment);
379 /// # deployment.deploy().await.unwrap();
380 /// # let mut out_recv = nodes.connect(out_port).await;
381 /// # deployment.start().await.unwrap();
382 /// # let mut results = Vec::new();
383 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
384 /// # results.sort();
385 /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
386 /// # assert_eq!(results, vec![11, 13, 16]);
387 /// # });
388 /// # }
389 /// ```
390 pub fn by_mut(&self) -> crate::handoff_ref::SingletonMut<'a, '_, T, L>
391 where
392 B: IsBounded,
393 {
394 crate::handoff_ref::SingletonMut::new(&self.ir_node)
395 }
396
397 /// Weakens the consistency of this live collection to not guarantee any consistency across
398 /// cluster members (if this collection is on a cluster).
399 pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
400 where
401 L: Location<'a>,
402 {
403 if L::consistency()
404 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
405 {
406 // already no consistency
407 Singleton::new(
408 self.location.drop_consistency(),
409 self.ir_node.replace(HydroNode::Placeholder),
410 )
411 } else {
412 Singleton::new(
413 self.location.drop_consistency(),
414 HydroNode::Cast {
415 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
416 metadata:
417 self.location
418 .clone()
419 .drop_consistency()
420 .new_node_metadata(
421 Singleton::<T, L::DropConsistency, B>::collection_kind(),
422 ),
423 },
424 )
425 }
426 }
427
428 /// Casts this live collection to have the consistency guarantees specified in the given
429 /// location type parameter. The developer must ensure that the strengthened consistency
430 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
431 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
432 self,
433 _proof: impl crate::properties::ConsistencyProof,
434 ) -> Singleton<T, L2, B>
435 where
436 L: Location<'a>,
437 {
438 if L::consistency() == L2::consistency() {
439 Singleton::new(
440 self.location.with_consistency_of(),
441 self.ir_node.replace(HydroNode::Placeholder),
442 )
443 } else {
444 Singleton::new(
445 self.location.with_consistency_of(),
446 HydroNode::AssertIsConsistent {
447 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448 trusted: false,
449 metadata: self
450 .location
451 .clone()
452 .with_consistency_of::<L2>()
453 .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
454 },
455 )
456 }
457 }
458
459 /// Drops the monotonicity property of the [`Singleton`].
460 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
461 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
462 Singleton::new(
463 self.location.clone(),
464 self.ir_node.replace(HydroNode::Placeholder),
465 )
466 } else {
467 Singleton::new(
468 self.location.clone(),
469 HydroNode::Cast {
470 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
471 metadata:
472 self.location.new_node_metadata(
473 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
474 ),
475 },
476 )
477 }
478 }
479
480 /// Transforms the singleton value by applying a function `f` to it,
481 /// continuously as the input is updated.
482 ///
483 /// # Example
484 /// ```rust
485 /// # #[cfg(feature = "deploy")] {
486 /// # use hydro_lang::prelude::*;
487 /// # use futures::StreamExt;
488 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
489 /// let tick = process.tick();
490 /// let singleton = tick.singleton(q!(5));
491 /// singleton.map(q!(|v| v * 2)).all_ticks()
492 /// # }, |mut stream| async move {
493 /// // 10
494 /// # assert_eq!(stream.next().await.unwrap(), 10);
495 /// # }));
496 /// # }
497 /// ```
498 pub fn map<U, F, OP, B2: SingletonBound>(
499 self,
500 f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
501 ) -> Singleton<U, L, B2>
502 where
503 F: Fn(T) -> U + 'a,
504 B: ApplyOrderPreservingSingleton<OP, B2>,
505 {
506 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
507 proof.register_proof(&f);
508 let f = f.into();
509 Singleton::new(
510 self.location.clone(),
511 HydroNode::Map {
512 f,
513 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
514 metadata: self
515 .location
516 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
517 },
518 )
519 }
520
521 /// Transforms the singleton value by applying a function `f` to it and then flattening
522 /// the result into a stream, preserving the order of elements.
523 ///
524 /// The function `f` is applied to the singleton value to produce an iterator, and all items
525 /// from that iterator are emitted in the output stream in deterministic order.
526 ///
527 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
528 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
529 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
530 ///
531 /// # Example
532 /// ```rust
533 /// # #[cfg(feature = "deploy")] {
534 /// # use hydro_lang::prelude::*;
535 /// # use futures::StreamExt;
536 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
537 /// let tick = process.tick();
538 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
539 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
540 /// # }, |mut stream| async move {
541 /// // 1, 2, 3
542 /// # for w in vec![1, 2, 3] {
543 /// # assert_eq!(stream.next().await.unwrap(), w);
544 /// # }
545 /// # }));
546 /// # }
547 /// ```
548 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
549 self,
550 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
551 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
552 where
553 B: IsBounded,
554 I: IntoIterator<Item = U>,
555 F: FnMut(T) -> I + 'a,
556 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
557 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
558 {
559 self.into_stream().flat_map_ordered(f)
560 }
561
562 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
563 /// for the output type `I` to produce items in any order.
564 ///
565 /// The function `f` is applied to the singleton value to produce an iterator, and all items
566 /// from that iterator are emitted in the output stream in non-deterministic order.
567 ///
568 /// # Example
569 /// ```rust
570 /// # #[cfg(feature = "deploy")] {
571 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
572 /// # use futures::StreamExt;
573 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
574 /// let tick = process.tick();
575 /// let singleton = tick.singleton(q!(
576 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
577 /// ));
578 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
579 /// # }, |mut stream| async move {
580 /// // 1, 2, 3, but in no particular order
581 /// # let mut results = Vec::new();
582 /// # for _ in 0..3 {
583 /// # results.push(stream.next().await.unwrap());
584 /// # }
585 /// # results.sort();
586 /// # assert_eq!(results, vec![1, 2, 3]);
587 /// # }));
588 /// # }
589 /// ```
590 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
591 self,
592 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
593 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
594 where
595 B: IsBounded,
596 I: IntoIterator<Item = U>,
597 F: FnMut(T) -> I + 'a,
598 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
599 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
600 {
601 self.into_stream().flat_map_unordered(f)
602 }
603
604 /// Flattens the singleton value into a stream, preserving the order of elements.
605 ///
606 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
607 /// are emitted in the output stream in deterministic order.
608 ///
609 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
610 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
611 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
612 ///
613 /// # Example
614 /// ```rust
615 /// # #[cfg(feature = "deploy")] {
616 /// # use hydro_lang::prelude::*;
617 /// # use futures::StreamExt;
618 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
619 /// let tick = process.tick();
620 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
621 /// singleton.flatten_ordered().all_ticks()
622 /// # }, |mut stream| async move {
623 /// // 1, 2, 3
624 /// # for w in vec![1, 2, 3] {
625 /// # assert_eq!(stream.next().await.unwrap(), w);
626 /// # }
627 /// # }));
628 /// # }
629 /// ```
630 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
631 where
632 B: IsBounded,
633 T: IntoIterator<Item = U>,
634 {
635 self.flat_map_ordered(q!(|x| x))
636 }
637
638 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
639 /// for the element type `T` to produce items in any order.
640 ///
641 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
642 /// are emitted in the output stream in non-deterministic order.
643 ///
644 /// # Example
645 /// ```rust
646 /// # #[cfg(feature = "deploy")] {
647 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
648 /// # use futures::StreamExt;
649 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
650 /// let tick = process.tick();
651 /// let singleton = tick.singleton(q!(
652 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
653 /// ));
654 /// singleton.flatten_unordered().all_ticks()
655 /// # }, |mut stream| async move {
656 /// // 1, 2, 3, but in no particular order
657 /// # let mut results = Vec::new();
658 /// # for _ in 0..3 {
659 /// # results.push(stream.next().await.unwrap());
660 /// # }
661 /// # results.sort();
662 /// # assert_eq!(results, vec![1, 2, 3]);
663 /// # }));
664 /// # }
665 /// ```
666 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
667 where
668 B: IsBounded,
669 T: IntoIterator<Item = U>,
670 {
671 self.flat_map_unordered(q!(|x| x))
672 }
673
674 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
675 ///
676 /// If the predicate returns `true`, the output optional contains the same value.
677 /// If the predicate returns `false`, the output optional is empty.
678 ///
679 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
680 /// not modify or take ownership of the value. If you need to modify the value while filtering
681 /// use [`Singleton::filter_map`] instead.
682 ///
683 /// # Example
684 /// ```rust
685 /// # #[cfg(feature = "deploy")] {
686 /// # use hydro_lang::prelude::*;
687 /// # use futures::StreamExt;
688 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
689 /// let tick = process.tick();
690 /// let singleton = tick.singleton(q!(5));
691 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
692 /// # }, |mut stream| async move {
693 /// // 5
694 /// # assert_eq!(stream.next().await.unwrap(), 5);
695 /// # }));
696 /// # }
697 /// ```
698 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
699 where
700 F: Fn(&T) -> bool + 'a,
701 {
702 let f = f.splice_fn1_borrow_ctx(&self.location).into();
703 Optional::new(
704 self.location.clone(),
705 HydroNode::Filter {
706 f,
707 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
708 metadata: self
709 .location
710 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
711 },
712 )
713 }
714
715 /// An operator that both filters and maps. It yields the value only if the supplied
716 /// closure `f` returns `Some(value)`.
717 ///
718 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
719 /// If the closure returns `None`, the output optional is empty.
720 ///
721 /// # Example
722 /// ```rust
723 /// # #[cfg(feature = "deploy")] {
724 /// # use hydro_lang::prelude::*;
725 /// # use futures::StreamExt;
726 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727 /// let tick = process.tick();
728 /// let singleton = tick.singleton(q!("42"));
729 /// singleton
730 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
731 /// .all_ticks()
732 /// # }, |mut stream| async move {
733 /// // 42
734 /// # assert_eq!(stream.next().await.unwrap(), 42);
735 /// # }));
736 /// # }
737 /// ```
738 pub fn filter_map<U, F>(
739 self,
740 f: impl IntoQuotedMut<'a, F, L>,
741 ) -> Optional<U, L, B::UnderlyingBound>
742 where
743 F: Fn(T) -> Option<U> + 'a,
744 {
745 let f = f.splice_fn1_ctx(&self.location).into();
746 Optional::new(
747 self.location.clone(),
748 HydroNode::FilterMap {
749 f,
750 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751 metadata: self
752 .location
753 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
754 },
755 )
756 }
757
758 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
759 ///
760 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
761 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
762 /// non-null. This is useful for combining several pieces of state together.
763 ///
764 /// # Example
765 /// ```rust
766 /// # #[cfg(feature = "deploy")] {
767 /// # use hydro_lang::prelude::*;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// let tick = process.tick();
771 /// let numbers = process
772 /// .source_iter(q!(vec![123, 456]))
773 /// .batch(&tick, nondet!(/** test */));
774 /// let count = numbers.clone().count(); // Singleton
775 /// let max = numbers.max(); // Optional
776 /// count.zip(max).all_ticks()
777 /// # }, |mut stream| async move {
778 /// // [(2, 456)]
779 /// # for w in vec![(2, 456)] {
780 /// # assert_eq!(stream.next().await.unwrap(), w);
781 /// # }
782 /// # }));
783 /// # }
784 /// ```
785 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
786 where
787 Self: ZipResult<'a, O, Location = L>,
788 B: IsBounded,
789 {
790 check_matching_location(&self.location, &Self::other_location(&other));
791
792 if L::is_top_level()
793 && let Some(tick) = self.location.try_tick()
794 {
795 let self_location = self.location().clone();
796 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
797 let out = zip_inside_tick(
798 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
799 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
800 other_location.clone(),
801 HydroNode::Cast {
802 inner: Box::new(Self::other_ir_node(other)),
803 metadata: other_location.new_node_metadata(Optional::<
804 <Self as ZipResult<'a, O>>::OtherType,
805 Tick<L>,
806 Bounded,
807 >::collection_kind(
808 )),
809 },
810 )
811 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
812 )
813 .latest();
814
815 Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
816 } else {
817 Self::make(
818 self.location.clone(),
819 HydroNode::CrossSingleton {
820 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
821 right: Box::new(Self::other_ir_node(other)),
822 metadata: self.location.new_node_metadata(CollectionKind::Optional {
823 bound: B::BOUND_KIND,
824 element_type: stageleft::quote_type::<
825 <Self as ZipResult<'a, O>>::ElementType,
826 >()
827 .into(),
828 }),
829 },
830 )
831 }
832 }
833
834 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
835 /// boolean signal is `true`, otherwise the output is null.
836 ///
837 /// # Example
838 /// ```rust
839 /// # #[cfg(feature = "deploy")] {
840 /// # use hydro_lang::prelude::*;
841 /// # use futures::StreamExt;
842 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
843 /// let tick = process.tick();
844 /// // ticks are lazy by default, forces the second tick to run
845 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
846 ///
847 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
848 /// let batch_first_tick = process
849 /// .source_iter(q!(vec![1]))
850 /// .batch(&tick, nondet!(/** test */));
851 /// let batch_second_tick = process
852 /// .source_iter(q!(vec![1, 2, 3]))
853 /// .batch(&tick, nondet!(/** test */))
854 /// .defer_tick();
855 /// batch_first_tick.chain(batch_second_tick).count()
856 /// .filter_if(signal)
857 /// .all_ticks()
858 /// # }, |mut stream| async move {
859 /// // [1]
860 /// # for w in vec![1] {
861 /// # assert_eq!(stream.next().await.unwrap(), w);
862 /// # }
863 /// # }));
864 /// # }
865 /// ```
866 pub fn filter_if(
867 self,
868 signal: Singleton<bool, L, B>,
869 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
870 where
871 B: IsBounded,
872 {
873 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
874 }
875
876 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
877 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
878 ///
879 /// Useful for conditionally processing, such as only emitting a singleton's value outside
880 /// a tick if some other condition is satisfied.
881 ///
882 /// # Example
883 /// ```rust
884 /// # #[cfg(feature = "deploy")] {
885 /// # use hydro_lang::prelude::*;
886 /// # use futures::StreamExt;
887 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
888 /// let tick = process.tick();
889 /// // ticks are lazy by default, forces the second tick to run
890 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
891 ///
892 /// let batch_first_tick = process
893 /// .source_iter(q!(vec![1]))
894 /// .batch(&tick, nondet!(/** test */));
895 /// let batch_second_tick = process
896 /// .source_iter(q!(vec![1, 2, 3]))
897 /// .batch(&tick, nondet!(/** test */))
898 /// .defer_tick(); // appears on the second tick
899 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
900 /// batch_first_tick.chain(batch_second_tick).count()
901 /// .filter_if_some(some_on_first_tick)
902 /// .all_ticks()
903 /// # }, |mut stream| async move {
904 /// // [1]
905 /// # for w in vec![1] {
906 /// # assert_eq!(stream.next().await.unwrap(), w);
907 /// # }
908 /// # }));
909 /// # }
910 /// ```
911 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
912 pub fn filter_if_some<U>(
913 self,
914 signal: Optional<U, L, B>,
915 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
916 where
917 B: IsBounded,
918 {
919 self.filter_if(signal.is_some())
920 }
921
922 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
923 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
924 ///
925 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
926 /// the condition.
927 ///
928 /// # Example
929 /// ```rust
930 /// # #[cfg(feature = "deploy")] {
931 /// # use hydro_lang::prelude::*;
932 /// # use futures::StreamExt;
933 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934 /// let tick = process.tick();
935 /// // ticks are lazy by default, forces the second tick to run
936 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
937 ///
938 /// let batch_first_tick = process
939 /// .source_iter(q!(vec![1]))
940 /// .batch(&tick, nondet!(/** test */));
941 /// let batch_second_tick = process
942 /// .source_iter(q!(vec![1, 2, 3]))
943 /// .batch(&tick, nondet!(/** test */))
944 /// .defer_tick(); // appears on the second tick
945 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
946 /// batch_first_tick.chain(batch_second_tick).count()
947 /// .filter_if_none(some_on_first_tick)
948 /// .all_ticks()
949 /// # }, |mut stream| async move {
950 /// // [3]
951 /// # for w in vec![3] {
952 /// # assert_eq!(stream.next().await.unwrap(), w);
953 /// # }
954 /// # }));
955 /// # }
956 /// ```
957 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
958 pub fn filter_if_none<U>(
959 self,
960 other: Optional<U, L, B>,
961 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
962 where
963 B: IsBounded,
964 {
965 self.filter_if(other.is_none())
966 }
967
968 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
969 ///
970 /// # Example
971 /// ```rust
972 /// # #[cfg(feature = "deploy")] {
973 /// # use hydro_lang::prelude::*;
974 /// # use futures::StreamExt;
975 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976 /// let tick = process.tick();
977 /// let a = tick.singleton(q!(5));
978 /// let b = tick.singleton(q!(5));
979 /// a.equals(b).all_ticks()
980 /// # }, |mut stream| async move {
981 /// // [true]
982 /// # assert_eq!(stream.next().await.unwrap(), true);
983 /// # }));
984 /// # }
985 /// ```
986 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
987 where
988 T: PartialEq,
989 B: IsBounded,
990 {
991 self.zip(other).map(q!(|(a, b)| a == b))
992 }
993
994 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
995 /// greater than or equal to the provided threshold. The event will have the value of the
996 /// given threshold.
997 ///
998 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
999 /// the threshold would be non-deterministic.
1000 ///
1001 /// # Example
1002 /// ```rust
1003 /// # #[cfg(feature = "deploy")] {
1004 /// # use hydro_lang::prelude::*;
1005 /// # use futures::StreamExt;
1006 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1007 /// let a = // singleton 1 ~> 5 ~> 10
1008 /// # process.singleton(q!(5));
1009 /// let b = process.singleton(q!(4));
1010 /// a.threshold_greater_or_equal(b)
1011 /// # }, |mut stream| async move {
1012 /// // [4]
1013 /// # assert_eq!(stream.next().await.unwrap(), 4);
1014 /// # }));
1015 /// # }
1016 /// ```
1017 pub fn threshold_greater_or_equal<B2: IsBounded>(
1018 self,
1019 threshold: Singleton<T, L, B2>,
1020 ) -> Stream<T, L, B::UnderlyingBound>
1021 where
1022 T: Clone + PartialOrd,
1023 B: IsMonotonic,
1024 {
1025 let threshold = threshold.make_bounded();
1026 let self_location = self.location().clone();
1027 match self.try_make_bounded() {
1028 Ok(bounded) => {
1029 let uncasted = threshold
1030 .zip(bounded)
1031 .into_stream()
1032 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1033
1034 Stream::new(
1035 uncasted.location.clone(),
1036 uncasted.ir_node.replace(HydroNode::Placeholder),
1037 )
1038 }
1039 Err(me) => {
1040 let uncasted = sliced! {
1041 let me = use(me, nondet!(/** thresholds are deterministic */));
1042 let mut remaining_threshold = use::state(|l| {
1043 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1044 as_option
1045 });
1046
1047 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1048 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1049 passed.map(q!(|(t, _)| t))
1050 };
1051
1052 Stream::new(
1053 self_location,
1054 uncasted.ir_node.replace(HydroNode::Placeholder),
1055 )
1056 }
1057 }
1058 }
1059
1060 /// An operator which allows you to "name" a `HydroNode`.
1061 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1062 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1063 {
1064 let mut node = self.ir_node.borrow_mut();
1065 let metadata = node.metadata_mut();
1066 metadata.tag = Some(name.to_owned());
1067 }
1068 self
1069 }
1070}
1071
1072impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1073 type Output = Singleton<bool, L, B::UnderlyingBound>;
1074
1075 fn not(self) -> Self::Output {
1076 self.map(q!(|b| !b))
1077 }
1078}
1079
1080impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1081where
1082 L: Location<'a>,
1083{
1084 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1085 /// the inner `Option`.
1086 ///
1087 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1088 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1089 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1090 ///
1091 /// # Example
1092 /// ```rust
1093 /// # #[cfg(feature = "deploy")] {
1094 /// # use hydro_lang::prelude::*;
1095 /// # use futures::StreamExt;
1096 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1097 /// let tick = process.tick();
1098 /// let singleton = tick.singleton(q!(Some(42)));
1099 /// singleton.into_optional().all_ticks()
1100 /// # }, |mut stream| async move {
1101 /// // 42
1102 /// # assert_eq!(stream.next().await.unwrap(), 42);
1103 /// # }));
1104 /// # }
1105 /// ```
1106 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1107 self.filter_map(q!(|v| v))
1108 }
1109}
1110
1111impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1112where
1113 L: Location<'a>,
1114{
1115 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1116 ///
1117 /// # Example
1118 /// ```rust
1119 /// # #[cfg(feature = "deploy")] {
1120 /// # use hydro_lang::prelude::*;
1121 /// # use futures::StreamExt;
1122 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123 /// let tick = process.tick();
1124 /// // ticks are lazy by default, forces the second tick to run
1125 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1126 ///
1127 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1128 /// let b = tick.singleton(q!(true)); // true, true
1129 /// a.and(b).all_ticks()
1130 /// # }, |mut stream| async move {
1131 /// // [true, false]
1132 /// # for w in vec![true, false] {
1133 /// # assert_eq!(stream.next().await.unwrap(), w);
1134 /// # }
1135 /// # }));
1136 /// # }
1137 /// ```
1138 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1139 where
1140 B: IsBounded,
1141 {
1142 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1143 }
1144
1145 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1146 ///
1147 /// # Example
1148 /// ```rust
1149 /// # #[cfg(feature = "deploy")] {
1150 /// # use hydro_lang::prelude::*;
1151 /// # use futures::StreamExt;
1152 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1153 /// let tick = process.tick();
1154 /// // ticks are lazy by default, forces the second tick to run
1155 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1156 ///
1157 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1158 /// let b = tick.singleton(q!(false)); // false, false
1159 /// a.or(b).all_ticks()
1160 /// # }, |mut stream| async move {
1161 /// // [true, false]
1162 /// # for w in vec![true, false] {
1163 /// # assert_eq!(stream.next().await.unwrap(), w);
1164 /// # }
1165 /// # }));
1166 /// # }
1167 /// ```
1168 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1169 where
1170 B: IsBounded,
1171 {
1172 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1173 }
1174}
1175
1176impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1177where
1178 L: Location<'a>,
1179{
1180 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1181 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1182 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1183 /// all snapshots of this singleton into the atomic-associated tick will observe the
1184 /// same value each tick.
1185 ///
1186 /// # Non-Determinism
1187 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1188 /// the output singleton has a non-deterministic value since the snapshot can be at an
1189 /// arbitrary point in time.
1190 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1191 self,
1192 tick: &Tick<L2>,
1193 _nondet: NonDet,
1194 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1195 Singleton::new(
1196 tick.drop_consistency(),
1197 HydroNode::Batch {
1198 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199 metadata: tick
1200 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1201 },
1202 )
1203 }
1204
1205 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1206 /// to the value will be asynchronously propagated.
1207 pub fn end_atomic(self) -> Singleton<T, L, B> {
1208 Singleton::new(
1209 self.location.tick.l.clone(),
1210 HydroNode::EndAtomic {
1211 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1212 metadata: self
1213 .location
1214 .tick
1215 .l
1216 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1217 },
1218 )
1219 }
1220}
1221
1222impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1223where
1224 L: Location<'a>,
1225{
1226 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1227 /// will observe the same version of the value and will be executed synchronously before any
1228 /// outputs are yielded (in [`Optional::end_atomic`]).
1229 ///
1230 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1231 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1232 /// a different version).
1233 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1234 let id = self.location.flow_state().borrow_mut().next_clock_id();
1235 let out_location = Atomic {
1236 tick: Tick {
1237 id,
1238 l: self.location.clone(),
1239 },
1240 };
1241 Singleton::new(
1242 out_location.clone(),
1243 HydroNode::BeginAtomic {
1244 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245 metadata: out_location
1246 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1247 },
1248 )
1249 }
1250
1251 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1252 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1253 /// relevant data that contributed to the snapshot at tick `t`.
1254 ///
1255 /// # Non-Determinism
1256 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1257 /// the output singleton has a non-deterministic value since the snapshot can be at an
1258 /// arbitrary point in time.
1259 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1260 self,
1261 tick: &Tick<L2>,
1262 _nondet: NonDet,
1263 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1264 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1265 Singleton::new(
1266 tick.drop_consistency(),
1267 HydroNode::Batch {
1268 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1269 metadata: tick
1270 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1271 },
1272 )
1273 }
1274
1275 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1276 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1277 ///
1278 /// # Non-Determinism
1279 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1280 /// to non-deterministic batching and arrival of inputs, the output stream is
1281 /// non-deterministic.
1282 pub fn sample_eager(
1283 self,
1284 nondet: NonDet,
1285 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1286 sliced! {
1287 let snapshot = use(self, nondet);
1288 snapshot.into_stream()
1289 }
1290 .weaken_retries()
1291 }
1292
1293 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1294 /// value taken at various points in time. Because the input singleton may be
1295 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1296 /// represent the value of the singleton given some prefix of the streams leading up to
1297 /// it.
1298 ///
1299 /// # Non-Determinism
1300 /// The output stream is non-deterministic in which elements are sampled, since this
1301 /// is controlled by a clock.
1302 pub fn sample_every(
1303 self,
1304 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1305 nondet: NonDet,
1306 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1307 where
1308 L: TopLevel<'a>,
1309 {
1310 let samples = self.location.source_interval(interval);
1311 sliced! {
1312 let snapshot = use(self, nondet);
1313 let sample_batch = use(samples, nondet);
1314
1315 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1316 }
1317 .weaken_retries()
1318 }
1319
1320 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1321 /// implies that `B == Bounded`.
1322 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1323 where
1324 B: IsBounded,
1325 {
1326 Singleton::new(
1327 self.location.clone(),
1328 self.ir_node.replace(HydroNode::Placeholder),
1329 )
1330 }
1331
1332 #[expect(clippy::result_large_err, reason = "internal use only")]
1333 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1334 if B::UnderlyingBound::BOUNDED {
1335 Ok(Singleton::new(
1336 self.location.clone(),
1337 self.ir_node.replace(HydroNode::Placeholder),
1338 ))
1339 } else {
1340 Err(self)
1341 }
1342 }
1343
1344 /// Clones this bounded singleton into a tick, returning a singleton that has the
1345 /// same value as the outer singleton. Because the outer singleton is bounded, this
1346 /// is deterministic because there is only a single immutable version.
1347 pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1348 self,
1349 tick: &Tick<L2>,
1350 ) -> Singleton<T, Tick<L2>, Bounded>
1351 where
1352 B: IsBounded,
1353 T: Clone,
1354 {
1355 // TODO(shadaj): avoid printing simulator logs for this snapshot
1356 let inner = self.snapshot(
1357 tick,
1358 nondet!(/** bounded top-level singleton so deterministic */),
1359 );
1360 Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1361 }
1362
1363 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1364 ///
1365 /// # Example
1366 /// ```rust
1367 /// # #[cfg(feature = "deploy")] {
1368 /// # use hydro_lang::prelude::*;
1369 /// # use futures::StreamExt;
1370 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1371 /// let tick = process.tick();
1372 /// let batch_input = process
1373 /// .source_iter(q!(vec![123, 456]))
1374 /// .batch(&tick, nondet!(/** test */));
1375 /// batch_input.clone().chain(
1376 /// batch_input.count().into_stream()
1377 /// ).all_ticks()
1378 /// # }, |mut stream| async move {
1379 /// // [123, 456, 2]
1380 /// # for w in vec![123, 456, 2] {
1381 /// # assert_eq!(stream.next().await.unwrap(), w);
1382 /// # }
1383 /// # }));
1384 /// # }
1385 /// ```
1386 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1387 where
1388 B: IsBounded,
1389 {
1390 Stream::new(
1391 self.location.clone(),
1392 HydroNode::Cast {
1393 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1394 metadata: self.location.new_node_metadata(Stream::<
1395 T,
1396 Tick<L>,
1397 Bounded,
1398 TotalOrder,
1399 ExactlyOnce,
1400 >::collection_kind()),
1401 },
1402 )
1403 }
1404
1405 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1406 /// producing a singleton of the resolved output.
1407 ///
1408 /// This is useful when the singleton contains an async computation that must
1409 /// be awaited before further processing. The future is polled to completion
1410 /// before the output value is emitted.
1411 ///
1412 /// # Example
1413 /// ```rust
1414 /// # #[cfg(feature = "deploy")] {
1415 /// # use hydro_lang::prelude::*;
1416 /// # use futures::StreamExt;
1417 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1418 /// let tick = process.tick();
1419 /// let singleton = tick.singleton(q!(5));
1420 /// singleton
1421 /// .map(q!(|v| async move { v * 2 }))
1422 /// .resolve_future_blocking()
1423 /// .all_ticks()
1424 /// # }, |mut stream| async move {
1425 /// // 10
1426 /// # assert_eq!(stream.next().await.unwrap(), 10);
1427 /// # }));
1428 /// # }
1429 /// ```
1430 pub fn resolve_future_blocking(
1431 self,
1432 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1433 where
1434 T: Future,
1435 B: IsBounded,
1436 {
1437 Singleton::new(
1438 self.location.clone(),
1439 HydroNode::ResolveFuturesBlocking {
1440 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1441 metadata: self
1442 .location
1443 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1444 },
1445 )
1446 }
1447}
1448
1449impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1450where
1451 L: Location<'a>,
1452{
1453 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1454 /// which will stream the value computed in _each_ tick as a separate stream element.
1455 ///
1456 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1457 /// producing one element in the output for each tick. This is useful for batched computations,
1458 /// where the results from each tick must be combined together.
1459 ///
1460 /// # Example
1461 /// ```rust
1462 /// # #[cfg(feature = "deploy")] {
1463 /// # use hydro_lang::prelude::*;
1464 /// # use futures::StreamExt;
1465 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1466 /// let tick = process.tick();
1467 /// # // ticks are lazy by default, forces the second tick to run
1468 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1469 /// # let batch_first_tick = process
1470 /// # .source_iter(q!(vec![1]))
1471 /// # .batch(&tick, nondet!(/** test */));
1472 /// # let batch_second_tick = process
1473 /// # .source_iter(q!(vec![1, 2, 3]))
1474 /// # .batch(&tick, nondet!(/** test */))
1475 /// # .defer_tick(); // appears on the second tick
1476 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1477 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1478 /// .count()
1479 /// .all_ticks()
1480 /// # }, |mut stream| async move {
1481 /// // [1, 3]
1482 /// # for w in vec![1, 3] {
1483 /// # assert_eq!(stream.next().await.unwrap(), w);
1484 /// # }
1485 /// # }));
1486 /// # }
1487 /// ```
1488 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1489 self.into_stream().all_ticks()
1490 }
1491
1492 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1493 /// which will stream the value computed in _each_ tick as a separate stream element.
1494 ///
1495 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1496 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1497 /// singleton's [`Tick`] context.
1498 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1499 self.into_stream().all_ticks_atomic()
1500 }
1501
1502 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1503 /// be asynchronously updated with the latest value of the singleton inside the tick.
1504 ///
1505 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1506 /// tick that tracks the inner value. This is useful for getting the value as of the
1507 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1508 ///
1509 /// # Example
1510 /// ```rust
1511 /// # #[cfg(feature = "deploy")] {
1512 /// # use hydro_lang::prelude::*;
1513 /// # use futures::StreamExt;
1514 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1515 /// let tick = process.tick();
1516 /// # // ticks are lazy by default, forces the second tick to run
1517 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1518 /// # let batch_first_tick = process
1519 /// # .source_iter(q!(vec![1]))
1520 /// # .batch(&tick, nondet!(/** test */));
1521 /// # let batch_second_tick = process
1522 /// # .source_iter(q!(vec![1, 2, 3]))
1523 /// # .batch(&tick, nondet!(/** test */))
1524 /// # .defer_tick(); // appears on the second tick
1525 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1526 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1527 /// .count()
1528 /// .latest()
1529 /// # .sample_eager(nondet!(/** test */))
1530 /// # }, |mut stream| async move {
1531 /// // asynchronously changes from 1 ~> 3
1532 /// # for w in vec![1, 3] {
1533 /// # assert_eq!(stream.next().await.unwrap(), w);
1534 /// # }
1535 /// # }));
1536 /// # }
1537 /// ```
1538 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1539 Singleton::new(
1540 self.location.outer().clone(),
1541 HydroNode::YieldConcat {
1542 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1543 metadata: self
1544 .location
1545 .outer()
1546 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1547 },
1548 )
1549 }
1550
1551 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1552 /// be updated with the latest value of the singleton inside the tick.
1553 ///
1554 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1555 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1556 /// singleton's [`Tick`] context.
1557 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1558 let out_location = Atomic {
1559 tick: self.location.clone(),
1560 };
1561 Singleton::new(
1562 out_location.clone(),
1563 HydroNode::YieldConcat {
1564 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1565 metadata: out_location
1566 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1567 },
1568 )
1569 }
1570}
1571
1572#[doc(hidden)]
1573/// Helper trait that determines the output collection type for [`Singleton::zip`].
1574///
1575/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1576/// [`Singleton`].
1577#[sealed::sealed]
1578pub trait ZipResult<'a, Other> {
1579 /// The output collection type.
1580 type Out;
1581 /// The type of the tupled output value.
1582 type ElementType;
1583 /// The type of the other collection's value.
1584 type OtherType;
1585 /// The location where the tupled result will be materialized.
1586 type Location: Location<'a>;
1587
1588 /// The location of the second input to the `zip`.
1589 fn other_location(other: &Other) -> Self::Location;
1590 /// The IR node of the second input to the `zip`.
1591 fn other_ir_node(other: Other) -> HydroNode;
1592
1593 /// Constructs the output live collection given an IR node containing the zip result.
1594 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1595}
1596
1597#[sealed::sealed]
1598impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1599where
1600 L: Location<'a>,
1601{
1602 type Out = Singleton<(T, U), L, B>;
1603 type ElementType = (T, U);
1604 type OtherType = U;
1605 type Location = L;
1606
1607 fn other_location(other: &Singleton<U, L, B>) -> L {
1608 other.location.clone()
1609 }
1610
1611 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1612 other.ir_node.replace(HydroNode::Placeholder)
1613 }
1614
1615 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1616 Singleton::new(
1617 location.clone(),
1618 HydroNode::Cast {
1619 inner: Box::new(ir_node),
1620 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1621 },
1622 )
1623 }
1624}
1625
1626#[sealed::sealed]
1627impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1628 for Singleton<T, L, B>
1629where
1630 L: Location<'a>,
1631{
1632 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1633 type ElementType = (T, U);
1634 type OtherType = U;
1635 type Location = L;
1636
1637 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1638 other.location.clone()
1639 }
1640
1641 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1642 other.ir_node.replace(HydroNode::Placeholder)
1643 }
1644
1645 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1646 Optional::new(location, ir_node)
1647 }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652 #[cfg(feature = "deploy")]
1653 use futures::{SinkExt, StreamExt};
1654 #[cfg(feature = "deploy")]
1655 use hydro_deploy::Deployment;
1656 #[cfg(any(feature = "deploy", feature = "sim"))]
1657 use stageleft::q;
1658
1659 #[cfg(any(feature = "deploy", feature = "sim"))]
1660 use crate::compile::builder::FlowBuilder;
1661 #[cfg(feature = "deploy")]
1662 use crate::live_collections::stream::ExactlyOnce;
1663 #[cfg(any(feature = "deploy", feature = "sim"))]
1664 use crate::location::Location;
1665 #[cfg(any(feature = "deploy", feature = "sim"))]
1666 use crate::nondet::nondet;
1667
1668 #[cfg(feature = "deploy")]
1669 #[tokio::test]
1670 async fn tick_cycle_cardinality() {
1671 let mut deployment = Deployment::new();
1672
1673 let mut flow = FlowBuilder::new();
1674 let node = flow.process::<()>();
1675 let external = flow.external::<()>();
1676
1677 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1678
1679 let node_tick = node.tick();
1680 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1681 let counts = singleton
1682 .clone()
1683 .into_stream()
1684 .count()
1685 .filter_if(
1686 input
1687 .batch(&node_tick, nondet!(/** testing */))
1688 .first()
1689 .is_some(),
1690 )
1691 .all_ticks()
1692 .send_bincode_external(&external);
1693 complete_cycle.complete_next_tick(singleton);
1694
1695 let nodes = flow
1696 .with_process(&node, deployment.Localhost())
1697 .with_external(&external, deployment.Localhost())
1698 .deploy(&mut deployment);
1699
1700 deployment.deploy().await.unwrap();
1701
1702 let mut tick_trigger = nodes.connect(input_send).await;
1703 let mut external_out = nodes.connect(counts).await;
1704
1705 deployment.start().await.unwrap();
1706
1707 tick_trigger.send(()).await.unwrap();
1708
1709 assert_eq!(external_out.next().await.unwrap(), 1);
1710
1711 tick_trigger.send(()).await.unwrap();
1712
1713 assert_eq!(external_out.next().await.unwrap(), 1);
1714 }
1715
1716 #[cfg(feature = "sim")]
1717 #[test]
1718 #[should_panic]
1719 fn sim_fold_intermediate_states() {
1720 let mut flow = FlowBuilder::new();
1721 let node = flow.process::<()>();
1722
1723 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1724 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1725
1726 let tick = node.tick();
1727 let batch = folded.snapshot(&tick, nondet!(/** test */));
1728 let out_recv = batch.all_ticks().sim_output();
1729
1730 flow.sim().exhaustive(async || {
1731 assert_eq!(out_recv.next().await.unwrap(), 10);
1732 });
1733 }
1734
1735 #[cfg(feature = "sim")]
1736 #[test]
1737 fn sim_fold_intermediate_state_count() {
1738 let mut flow = FlowBuilder::new();
1739 let node = flow.process::<()>();
1740
1741 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1742 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1743
1744 let tick = node.tick();
1745 let batch = folded.snapshot(&tick, nondet!(/** test */));
1746 let out_recv = batch.all_ticks().sim_output();
1747
1748 let instance_count = flow.sim().exhaustive(async || {
1749 let out = out_recv.collect::<Vec<_>>().await;
1750 assert_eq!(out.last(), Some(&10));
1751 });
1752
1753 assert_eq!(
1754 instance_count,
1755 16 // 2^4 possible subsets of intermediates (including initial state)
1756 )
1757 }
1758
1759 #[cfg(feature = "sim")]
1760 #[test]
1761 fn sim_fold_no_repeat_initial() {
1762 // check that we don't repeat the initial state of the fold in autonomous decisions
1763
1764 let mut flow = FlowBuilder::new();
1765 let node = flow.process::<()>();
1766
1767 let (in_port, input) = node.sim_input();
1768 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1769
1770 let tick = node.tick();
1771 let batch = folded.snapshot(&tick, nondet!(/** test */));
1772 let out_recv = batch.all_ticks().sim_output();
1773
1774 flow.sim().exhaustive(async || {
1775 assert_eq!(out_recv.next().await.unwrap(), 0);
1776
1777 in_port.send(123);
1778
1779 assert_eq!(out_recv.next().await.unwrap(), 123);
1780 });
1781 }
1782
1783 #[cfg(feature = "sim")]
1784 #[test]
1785 #[should_panic]
1786 fn sim_fold_repeats_snapshots() {
1787 // when the tick is driven by a snapshot AND something else, the snapshot can
1788 // "stutter" and repeat the same state multiple times
1789
1790 let mut flow = FlowBuilder::new();
1791 let node = flow.process::<()>();
1792
1793 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1794 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1795
1796 let tick = node.tick();
1797 let batch = source
1798 .batch(&tick, nondet!(/** test */))
1799 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1800 let out_recv = batch.all_ticks().sim_output();
1801
1802 flow.sim().exhaustive(async || {
1803 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1804 {
1805 panic!("repeated snapshot");
1806 }
1807 });
1808 }
1809
1810 #[cfg(feature = "sim")]
1811 #[test]
1812 fn sim_fold_repeats_snapshots_count() {
1813 // check the number of instances
1814 let mut flow = FlowBuilder::new();
1815 let node = flow.process::<()>();
1816
1817 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1818 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1819
1820 let tick = node.tick();
1821 let batch = source
1822 .batch(&tick, nondet!(/** test */))
1823 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1824 let out_recv = batch.all_ticks().sim_output();
1825
1826 let count = flow.sim().exhaustive(async || {
1827 let _ = out_recv.collect::<Vec<_>>().await;
1828 });
1829
1830 assert_eq!(count, 52);
1831 // don't have a combinatorial explanation for this number yet, but checked via logs
1832 }
1833
1834 #[cfg(feature = "sim")]
1835 #[test]
1836 fn sim_top_level_singleton_exhaustive() {
1837 // ensures that top-level singletons have only one snapshot
1838 let mut flow = FlowBuilder::new();
1839 let node = flow.process::<()>();
1840
1841 let singleton = node.singleton(q!(1));
1842 let tick = node.tick();
1843 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1844 let out_recv = batch.all_ticks().sim_output();
1845
1846 let count = flow.sim().exhaustive(async || {
1847 let _ = out_recv.collect::<Vec<_>>().await;
1848 });
1849
1850 assert_eq!(count, 1);
1851 }
1852
1853 #[cfg(feature = "sim")]
1854 #[test]
1855 fn sim_top_level_singleton_join_count() {
1856 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1857 // exploration
1858
1859 let mut flow = FlowBuilder::new();
1860 let node = flow.process::<()>();
1861
1862 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1863 let tick = node.tick();
1864 let batch = source_iter
1865 .batch(&tick, nondet!(/** test */))
1866 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1867 let out_recv = batch.all_ticks().sim_output();
1868
1869 let instance_count = flow.sim().exhaustive(async || {
1870 let _ = out_recv.collect::<Vec<_>>().await;
1871 });
1872
1873 assert_eq!(
1874 instance_count,
1875 16 // 2^4 ways to split up (including a possibly empty first batch)
1876 )
1877 }
1878
1879 #[cfg(feature = "sim")]
1880 #[test]
1881 fn top_level_singleton_into_stream_no_replay() {
1882 let mut flow = FlowBuilder::new();
1883 let node = flow.process::<()>();
1884
1885 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1886 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1887
1888 let out_recv = folded.into_stream().sim_output();
1889
1890 flow.sim().exhaustive(async || {
1891 out_recv.assert_yields_only([10]).await;
1892 });
1893 }
1894
1895 #[cfg(feature = "sim")]
1896 #[test]
1897 fn inside_tick_singleton_zip() {
1898 use crate::live_collections::Stream;
1899 use crate::live_collections::sliced::sliced;
1900
1901 let mut flow = FlowBuilder::new();
1902 let node = flow.process::<()>();
1903
1904 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1905 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1906
1907 let out_recv = sliced! {
1908 let v = use(folded, nondet!(/** test */));
1909 v.clone().zip(v).into_stream()
1910 }
1911 .sim_output();
1912
1913 let count = flow.sim().exhaustive(async || {
1914 let out = out_recv.collect::<Vec<_>>().await;
1915 assert_eq!(out.last(), Some(&(3, 3)));
1916 });
1917
1918 assert_eq!(count, 4);
1919 }
1920
1921 /// Reproducer for simulator hang when using cross_singleton on a top-level
1922 /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1923 /// after the first iteration.
1924 #[cfg(feature = "sim")]
1925 #[test]
1926 fn sim_cross_singleton_top_level_unbounded_hang() {
1927 let mut flow = FlowBuilder::new();
1928 let node = flow.process::<()>();
1929
1930 let (cmd_port, input) = node.sim_input::<String, _, _>();
1931
1932 let top_level_singleton = node.singleton(q!(123));
1933
1934 // cross_singleton on a top-level stream - bug trigger
1935 let crossed = input.cross_singleton(top_level_singleton);
1936
1937 // Output directly
1938 let resp_port = crossed.sim_output();
1939
1940 let count = flow.sim().exhaustive(async || {
1941 cmd_port.send("abc".to_owned());
1942
1943 let responses: Vec<_> = resp_port.collect().await;
1944 assert!(!responses.is_empty());
1945 });
1946
1947 assert_eq!(count, 1);
1948 }
1949
1950 #[cfg(feature = "sim")]
1951 #[test]
1952 fn sim_top_level_singleton_state_count() {
1953 let mut flow = FlowBuilder::new();
1954 let process = flow.process::<()>();
1955
1956 let (cmd_port, input) = process.sim_input();
1957 {
1958 // increases exhaustive inputs from 1 to 2 before we optimized `From`
1959 use super::Singleton;
1960 use crate::live_collections::boundedness::Unbounded;
1961 let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1962 }
1963 let tick = process.tick();
1964 let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1965 let resp_port = batched_unbatched.sim_output();
1966
1967 let count = flow.sim().exhaustive(async || {
1968 cmd_port.send(());
1969 let _responses: Vec<_> = resp_port.collect().await;
1970 });
1971
1972 assert_eq!(count, 1);
1973 }
1974}