hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a> + NoTick,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let tick = value.location().tick();
137 value.clone_into_tick(&tick).latest()
138 }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
148 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
149 location.clone(),
150 HydroNode::DeferTick {
151 input: Box::new(HydroNode::CycleSource {
152 cycle_id,
153 metadata: location.new_node_metadata(Self::collection_kind()),
154 }),
155 metadata: location
156 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
157 },
158 );
159
160 from_previous_tick.unwrap_or(initial)
161 }
162}
163
164impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
165where
166 L: Location<'a>,
167{
168 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
169 assert_eq!(
170 Location::id(&self.location),
171 expected_location,
172 "locations do not match"
173 );
174 self.location
175 .flow_state()
176 .borrow_mut()
177 .push_root(HydroRoot::CycleSink {
178 cycle_id,
179 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
180 op_metadata: HydroIrOpMetadata::new(),
181 });
182 }
183}
184
185impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
186where
187 L: Location<'a>,
188{
189 type Location = Tick<L>;
190
191 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
192 Singleton::new(
193 location.clone(),
194 HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 },
198 )
199 }
200}
201
202impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
203where
204 L: Location<'a>,
205{
206 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
207 assert_eq!(
208 Location::id(&self.location),
209 expected_location,
210 "locations do not match"
211 );
212 self.location
213 .flow_state()
214 .borrow_mut()
215 .push_root(HydroRoot::CycleSink {
216 cycle_id,
217 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
218 op_metadata: HydroIrOpMetadata::new(),
219 });
220 }
221}
222
223impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
224where
225 L: Location<'a> + NoTick,
226{
227 type Location = L;
228
229 fn create_source(cycle_id: CycleId, location: L) -> Self {
230 Singleton::new(
231 location.clone(),
232 HydroNode::CycleSource {
233 cycle_id,
234 metadata: location.new_node_metadata(Self::collection_kind()),
235 },
236 )
237 }
238}
239
240impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
241where
242 L: Location<'a> + NoTick,
243{
244 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
245 assert_eq!(
246 Location::id(&self.location),
247 expected_location,
248 "locations do not match"
249 );
250 self.location
251 .flow_state()
252 .borrow_mut()
253 .push_root(HydroRoot::CycleSink {
254 cycle_id,
255 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
256 op_metadata: HydroIrOpMetadata::new(),
257 });
258 }
259}
260
261impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
262where
263 T: Clone,
264 L: Location<'a>,
265{
266 fn clone(&self) -> Self {
267 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
268 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
269 *self.ir_node.borrow_mut() = HydroNode::Tee {
270 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
271 metadata: self.location.new_node_metadata(Self::collection_kind()),
272 };
273 }
274
275 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
276 Singleton {
277 location: self.location.clone(),
278 flow_state: self.flow_state.clone(),
279 ir_node: HydroNode::Tee {
280 inner: SharedNode(inner.0.clone()),
281 metadata: metadata.clone(),
282 }
283 .into(),
284 _phantom: PhantomData,
285 }
286 } else {
287 unreachable!()
288 }
289 }
290}
291
292#[cfg(stageleft_runtime)]
293fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
294 me: Singleton<T, Tick<L>, B>,
295 other: Optional<O, Tick<L>, B::UnderlyingBound>,
296) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
297 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
298 super::optional::zip_inside_tick(me_as_optional, other)
299}
300
301impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
302where
303 L: Location<'a>,
304{
305 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308 let flow_state = location.flow_state().clone();
309 Singleton {
310 location,
311 flow_state,
312 ir_node: RefCell::new(ir_node),
313 _phantom: PhantomData,
314 }
315 }
316
317 pub(crate) fn collection_kind() -> CollectionKind {
318 CollectionKind::Singleton {
319 bound: B::bound_kind(),
320 element_type: stageleft::quote_type::<T>().into(),
321 }
322 }
323
324 /// Returns the [`Location`] where this singleton is being materialized.
325 pub fn location(&self) -> &L {
326 &self.location
327 }
328
329 /// Creates a lightweight reference handle to this singleton that can be captured
330 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
331 ///
332 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
333 ///
334 /// ```rust
335 /// # #[cfg(feature = "deploy")] {
336 /// # use hydro_lang::prelude::*;
337 /// # use futures::StreamExt;
338 /// # tokio_test::block_on(async {
339 /// # let mut deployment = hydro_deploy::Deployment::new();
340 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
341 /// # let process = builder.process::<()>();
342 /// # let external = builder.external::<()>();
343 /// let my_count = process
344 /// .source_iter(q!(0..5i32))
345 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
346 /// let count_ref = my_count.by_ref();
347 /// let out_port = process
348 /// .source_iter(q!(1..=3i32))
349 /// .map(q!(|x| x + *count_ref))
350 /// .send_bincode_external(&external);
351 /// # let nodes = builder
352 /// # .with_default_optimize()
353 /// # .with_process(&process, deployment.Localhost())
354 /// # .with_external(&external, deployment.Localhost())
355 /// # .deploy(&mut deployment);
356 /// # deployment.deploy().await.unwrap();
357 /// # let mut out_recv = nodes.connect(out_port).await;
358 /// # deployment.start().await.unwrap();
359 /// # let mut results = Vec::new();
360 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
361 /// # results.sort();
362 /// // fold(0..5) = 10, so results are 11, 12, 13
363 /// # assert_eq!(results, vec![11, 12, 13]);
364 /// # });
365 /// # }
366 /// ```
367 pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, T, L>
368 where
369 B: IsBounded,
370 {
371 // Wrap in HydroNode::Singleton for materialization + identity tracking.
372 // If already a Singleton node, reuse it. If a Tee (from clone), wrap inner.
373 if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) {
374 let orig = self.ir_node.replace(HydroNode::Placeholder);
375 *self.ir_node.borrow_mut() = HydroNode::Singleton {
376 inner: SharedNode(Rc::new(RefCell::new(orig))),
377 metadata: self.location.new_node_metadata(Self::collection_kind()),
378 };
379 }
380 let borrow = self.ir_node.borrow();
381 let HydroNode::Singleton { inner, .. } = &*borrow else {
382 unreachable!()
383 };
384 crate::singleton_ref::SingletonRef::new(Rc::clone(&inner.0))
385 }
386
387 /// Drops the monotonicity property of the [`Singleton`].
388 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
389 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
390 Singleton::new(
391 self.location.clone(),
392 self.ir_node.replace(HydroNode::Placeholder),
393 )
394 } else {
395 Singleton::new(
396 self.location.clone(),
397 HydroNode::Cast {
398 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
399 metadata:
400 self.location.new_node_metadata(
401 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
402 ),
403 },
404 )
405 }
406 }
407
408 /// Transforms the singleton value by applying a function `f` to it,
409 /// continuously as the input is updated.
410 ///
411 /// # Example
412 /// ```rust
413 /// # #[cfg(feature = "deploy")] {
414 /// # use hydro_lang::prelude::*;
415 /// # use futures::StreamExt;
416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
417 /// let tick = process.tick();
418 /// let singleton = tick.singleton(q!(5));
419 /// singleton.map(q!(|v| v * 2)).all_ticks()
420 /// # }, |mut stream| async move {
421 /// // 10
422 /// # assert_eq!(stream.next().await.unwrap(), 10);
423 /// # }));
424 /// # }
425 /// ```
426 pub fn map<U, F, OP, B2: SingletonBound>(
427 self,
428 f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
429 ) -> Singleton<U, L, B2>
430 where
431 F: Fn(T) -> U + 'a,
432 B: ApplyOrderPreservingSingleton<OP, B2>,
433 {
434 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
435 proof.register_proof(&f);
436 let f = f.into();
437 Singleton::new(
438 self.location.clone(),
439 HydroNode::Map {
440 f,
441 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
442 metadata: self
443 .location
444 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
445 },
446 )
447 }
448
449 /// Transforms the singleton value by applying a function `f` to it and then flattening
450 /// the result into a stream, preserving the order of elements.
451 ///
452 /// The function `f` is applied to the singleton value to produce an iterator, and all items
453 /// from that iterator are emitted in the output stream in deterministic order.
454 ///
455 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
456 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
457 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// let tick = process.tick();
466 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
467 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3
470 /// # for w in vec![1, 2, 3] {
471 /// # assert_eq!(stream.next().await.unwrap(), w);
472 /// # }
473 /// # }));
474 /// # }
475 /// ```
476 pub fn flat_map_ordered<U, I, F>(
477 self,
478 f: impl IntoQuotedMut<'a, F, L>,
479 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
480 where
481 B: IsBounded,
482 I: IntoIterator<Item = U>,
483 F: Fn(T) -> I + 'a,
484 {
485 self.into_stream().flat_map_ordered(f)
486 }
487
488 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
489 /// for the output type `I` to produce items in any order.
490 ///
491 /// The function `f` is applied to the singleton value to produce an iterator, and all items
492 /// from that iterator are emitted in the output stream in non-deterministic order.
493 ///
494 /// # Example
495 /// ```rust
496 /// # #[cfg(feature = "deploy")] {
497 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
498 /// # use futures::StreamExt;
499 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
500 /// let tick = process.tick();
501 /// let singleton = tick.singleton(q!(
502 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
503 /// ));
504 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
505 /// # }, |mut stream| async move {
506 /// // 1, 2, 3, but in no particular order
507 /// # let mut results = Vec::new();
508 /// # for _ in 0..3 {
509 /// # results.push(stream.next().await.unwrap());
510 /// # }
511 /// # results.sort();
512 /// # assert_eq!(results, vec![1, 2, 3]);
513 /// # }));
514 /// # }
515 /// ```
516 pub fn flat_map_unordered<U, I, F>(
517 self,
518 f: impl IntoQuotedMut<'a, F, L>,
519 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
520 where
521 B: IsBounded,
522 I: IntoIterator<Item = U>,
523 F: Fn(T) -> I + 'a,
524 {
525 self.into_stream().flat_map_unordered(f)
526 }
527
528 /// Flattens the singleton value into a stream, preserving the order of elements.
529 ///
530 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
531 /// are emitted in the output stream in deterministic order.
532 ///
533 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
534 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
535 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
536 ///
537 /// # Example
538 /// ```rust
539 /// # #[cfg(feature = "deploy")] {
540 /// # use hydro_lang::prelude::*;
541 /// # use futures::StreamExt;
542 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
543 /// let tick = process.tick();
544 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
545 /// singleton.flatten_ordered().all_ticks()
546 /// # }, |mut stream| async move {
547 /// // 1, 2, 3
548 /// # for w in vec![1, 2, 3] {
549 /// # assert_eq!(stream.next().await.unwrap(), w);
550 /// # }
551 /// # }));
552 /// # }
553 /// ```
554 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
555 where
556 B: IsBounded,
557 T: IntoIterator<Item = U>,
558 {
559 self.flat_map_ordered(q!(|x| x))
560 }
561
562 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
563 /// for the element type `T` to produce items in any order.
564 ///
565 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
566 /// are emitted in the output stream in non-deterministic order.
567 ///
568 /// # Example
569 /// ```rust
570 /// # #[cfg(feature = "deploy")] {
571 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
572 /// # use futures::StreamExt;
573 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
574 /// let tick = process.tick();
575 /// let singleton = tick.singleton(q!(
576 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
577 /// ));
578 /// singleton.flatten_unordered().all_ticks()
579 /// # }, |mut stream| async move {
580 /// // 1, 2, 3, but in no particular order
581 /// # let mut results = Vec::new();
582 /// # for _ in 0..3 {
583 /// # results.push(stream.next().await.unwrap());
584 /// # }
585 /// # results.sort();
586 /// # assert_eq!(results, vec![1, 2, 3]);
587 /// # }));
588 /// # }
589 /// ```
590 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
591 where
592 B: IsBounded,
593 T: IntoIterator<Item = U>,
594 {
595 self.flat_map_unordered(q!(|x| x))
596 }
597
598 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
599 ///
600 /// If the predicate returns `true`, the output optional contains the same value.
601 /// If the predicate returns `false`, the output optional is empty.
602 ///
603 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
604 /// not modify or take ownership of the value. If you need to modify the value while filtering
605 /// use [`Singleton::filter_map`] instead.
606 ///
607 /// # Example
608 /// ```rust
609 /// # #[cfg(feature = "deploy")] {
610 /// # use hydro_lang::prelude::*;
611 /// # use futures::StreamExt;
612 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613 /// let tick = process.tick();
614 /// let singleton = tick.singleton(q!(5));
615 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
616 /// # }, |mut stream| async move {
617 /// // 5
618 /// # assert_eq!(stream.next().await.unwrap(), 5);
619 /// # }));
620 /// # }
621 /// ```
622 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
623 where
624 F: Fn(&T) -> bool + 'a,
625 {
626 let f = f.splice_fn1_borrow_ctx(&self.location).into();
627 Optional::new(
628 self.location.clone(),
629 HydroNode::Filter {
630 f,
631 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
632 metadata: self
633 .location
634 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
635 },
636 )
637 }
638
639 /// An operator that both filters and maps. It yields the value only if the supplied
640 /// closure `f` returns `Some(value)`.
641 ///
642 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
643 /// If the closure returns `None`, the output optional is empty.
644 ///
645 /// # Example
646 /// ```rust
647 /// # #[cfg(feature = "deploy")] {
648 /// # use hydro_lang::prelude::*;
649 /// # use futures::StreamExt;
650 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
651 /// let tick = process.tick();
652 /// let singleton = tick.singleton(q!("42"));
653 /// singleton
654 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
655 /// .all_ticks()
656 /// # }, |mut stream| async move {
657 /// // 42
658 /// # assert_eq!(stream.next().await.unwrap(), 42);
659 /// # }));
660 /// # }
661 /// ```
662 pub fn filter_map<U, F>(
663 self,
664 f: impl IntoQuotedMut<'a, F, L>,
665 ) -> Optional<U, L, B::UnderlyingBound>
666 where
667 F: Fn(T) -> Option<U> + 'a,
668 {
669 let f = f.splice_fn1_ctx(&self.location).into();
670 Optional::new(
671 self.location.clone(),
672 HydroNode::FilterMap {
673 f,
674 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
675 metadata: self
676 .location
677 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
678 },
679 )
680 }
681
682 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
683 ///
684 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
685 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
686 /// non-null. This is useful for combining several pieces of state together.
687 ///
688 /// # Example
689 /// ```rust
690 /// # #[cfg(feature = "deploy")] {
691 /// # use hydro_lang::prelude::*;
692 /// # use futures::StreamExt;
693 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
694 /// let tick = process.tick();
695 /// let numbers = process
696 /// .source_iter(q!(vec![123, 456]))
697 /// .batch(&tick, nondet!(/** test */));
698 /// let count = numbers.clone().count(); // Singleton
699 /// let max = numbers.max(); // Optional
700 /// count.zip(max).all_ticks()
701 /// # }, |mut stream| async move {
702 /// // [(2, 456)]
703 /// # for w in vec![(2, 456)] {
704 /// # assert_eq!(stream.next().await.unwrap(), w);
705 /// # }
706 /// # }));
707 /// # }
708 /// ```
709 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
710 where
711 Self: ZipResult<'a, O, Location = L>,
712 B: IsBounded,
713 {
714 check_matching_location(&self.location, &Self::other_location(&other));
715
716 if L::is_top_level()
717 && let Some(tick) = self.location.try_tick()
718 {
719 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
720 let out = zip_inside_tick(
721 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
722 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
723 other_location.clone(),
724 HydroNode::Cast {
725 inner: Box::new(Self::other_ir_node(other)),
726 metadata: other_location.new_node_metadata(Optional::<
727 <Self as ZipResult<'a, O>>::OtherType,
728 Tick<L>,
729 Bounded,
730 >::collection_kind(
731 )),
732 },
733 )
734 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
735 )
736 .latest();
737
738 Self::make(
739 out.location.clone(),
740 out.ir_node.replace(HydroNode::Placeholder),
741 )
742 } else {
743 Self::make(
744 self.location.clone(),
745 HydroNode::CrossSingleton {
746 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
747 right: Box::new(Self::other_ir_node(other)),
748 metadata: self.location.new_node_metadata(CollectionKind::Optional {
749 bound: B::BOUND_KIND,
750 element_type: stageleft::quote_type::<
751 <Self as ZipResult<'a, O>>::ElementType,
752 >()
753 .into(),
754 }),
755 },
756 )
757 }
758 }
759
760 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
761 /// boolean signal is `true`, otherwise the output is null.
762 ///
763 /// # Example
764 /// ```rust
765 /// # #[cfg(feature = "deploy")] {
766 /// # use hydro_lang::prelude::*;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let tick = process.tick();
770 /// // ticks are lazy by default, forces the second tick to run
771 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
772 ///
773 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
774 /// let batch_first_tick = process
775 /// .source_iter(q!(vec![1]))
776 /// .batch(&tick, nondet!(/** test */));
777 /// let batch_second_tick = process
778 /// .source_iter(q!(vec![1, 2, 3]))
779 /// .batch(&tick, nondet!(/** test */))
780 /// .defer_tick();
781 /// batch_first_tick.chain(batch_second_tick).count()
782 /// .filter_if(signal)
783 /// .all_ticks()
784 /// # }, |mut stream| async move {
785 /// // [1]
786 /// # for w in vec![1] {
787 /// # assert_eq!(stream.next().await.unwrap(), w);
788 /// # }
789 /// # }));
790 /// # }
791 /// ```
792 pub fn filter_if(
793 self,
794 signal: Singleton<bool, L, B>,
795 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
796 where
797 B: IsBounded,
798 {
799 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
800 }
801
802 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
803 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
804 ///
805 /// Useful for conditionally processing, such as only emitting a singleton's value outside
806 /// a tick if some other condition is satisfied.
807 ///
808 /// # Example
809 /// ```rust
810 /// # #[cfg(feature = "deploy")] {
811 /// # use hydro_lang::prelude::*;
812 /// # use futures::StreamExt;
813 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
814 /// let tick = process.tick();
815 /// // ticks are lazy by default, forces the second tick to run
816 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
817 ///
818 /// let batch_first_tick = process
819 /// .source_iter(q!(vec![1]))
820 /// .batch(&tick, nondet!(/** test */));
821 /// let batch_second_tick = process
822 /// .source_iter(q!(vec![1, 2, 3]))
823 /// .batch(&tick, nondet!(/** test */))
824 /// .defer_tick(); // appears on the second tick
825 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
826 /// batch_first_tick.chain(batch_second_tick).count()
827 /// .filter_if_some(some_on_first_tick)
828 /// .all_ticks()
829 /// # }, |mut stream| async move {
830 /// // [1]
831 /// # for w in vec![1] {
832 /// # assert_eq!(stream.next().await.unwrap(), w);
833 /// # }
834 /// # }));
835 /// # }
836 /// ```
837 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
838 pub fn filter_if_some<U>(
839 self,
840 signal: Optional<U, L, B>,
841 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
842 where
843 B: IsBounded,
844 {
845 self.filter_if(signal.is_some())
846 }
847
848 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
849 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
850 ///
851 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
852 /// the condition.
853 ///
854 /// # Example
855 /// ```rust
856 /// # #[cfg(feature = "deploy")] {
857 /// # use hydro_lang::prelude::*;
858 /// # use futures::StreamExt;
859 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
860 /// let tick = process.tick();
861 /// // ticks are lazy by default, forces the second tick to run
862 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
863 ///
864 /// let batch_first_tick = process
865 /// .source_iter(q!(vec![1]))
866 /// .batch(&tick, nondet!(/** test */));
867 /// let batch_second_tick = process
868 /// .source_iter(q!(vec![1, 2, 3]))
869 /// .batch(&tick, nondet!(/** test */))
870 /// .defer_tick(); // appears on the second tick
871 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
872 /// batch_first_tick.chain(batch_second_tick).count()
873 /// .filter_if_none(some_on_first_tick)
874 /// .all_ticks()
875 /// # }, |mut stream| async move {
876 /// // [3]
877 /// # for w in vec![3] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
884 pub fn filter_if_none<U>(
885 self,
886 other: Optional<U, L, B>,
887 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
888 where
889 B: IsBounded,
890 {
891 self.filter_if(other.is_none())
892 }
893
894 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
895 ///
896 /// # Example
897 /// ```rust
898 /// # #[cfg(feature = "deploy")] {
899 /// # use hydro_lang::prelude::*;
900 /// # use futures::StreamExt;
901 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
902 /// let tick = process.tick();
903 /// let a = tick.singleton(q!(5));
904 /// let b = tick.singleton(q!(5));
905 /// a.equals(b).all_ticks()
906 /// # }, |mut stream| async move {
907 /// // [true]
908 /// # assert_eq!(stream.next().await.unwrap(), true);
909 /// # }));
910 /// # }
911 /// ```
912 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
913 where
914 T: PartialEq,
915 B: IsBounded,
916 {
917 self.zip(other).map(q!(|(a, b)| a == b))
918 }
919
920 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
921 /// greater than or equal to the provided threshold. The event will have the value of the
922 /// given threshold.
923 ///
924 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
925 /// the threshold would be non-deterministic.
926 ///
927 /// # Example
928 /// ```rust
929 /// # #[cfg(feature = "deploy")] {
930 /// # use hydro_lang::prelude::*;
931 /// # use futures::StreamExt;
932 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
933 /// let a = // singleton 1 ~> 5 ~> 10
934 /// # process.singleton(q!(5));
935 /// let b = process.singleton(q!(4));
936 /// a.threshold_greater_or_equal(b)
937 /// # }, |mut stream| async move {
938 /// // [4]
939 /// # assert_eq!(stream.next().await.unwrap(), 4);
940 /// # }));
941 /// # }
942 /// ```
943 pub fn threshold_greater_or_equal<B2: IsBounded>(
944 self,
945 threshold: Singleton<T, L, B2>,
946 ) -> Stream<T, L, B::UnderlyingBound>
947 where
948 T: Clone + PartialOrd,
949 B: IsMonotonic,
950 {
951 let threshold = threshold.make_bounded();
952 match self.try_make_bounded() {
953 Ok(bounded) => {
954 let uncasted = threshold
955 .zip(bounded)
956 .into_stream()
957 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
958
959 Stream::new(
960 uncasted.location.clone(),
961 uncasted.ir_node.replace(HydroNode::Placeholder),
962 )
963 }
964 Err(me) => {
965 let uncasted = sliced! {
966 let me = use(me, nondet!(/** thresholds are deterministic */));
967 let mut remaining_threshold = use::state(|l| {
968 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
969 as_option
970 });
971
972 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
973 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
974 passed.map(q!(|(t, _)| t))
975 };
976
977 Stream::new(
978 uncasted.location.clone(),
979 uncasted.ir_node.replace(HydroNode::Placeholder),
980 )
981 }
982 }
983 }
984
985 /// An operator which allows you to "name" a `HydroNode`.
986 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
987 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
988 {
989 let mut node = self.ir_node.borrow_mut();
990 let metadata = node.metadata_mut();
991 metadata.tag = Some(name.to_owned());
992 }
993 self
994 }
995}
996
997impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
998 type Output = Singleton<bool, L, B::UnderlyingBound>;
999
1000 fn not(self) -> Self::Output {
1001 self.map(q!(|b| !b))
1002 }
1003}
1004
1005impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1006where
1007 L: Location<'a>,
1008{
1009 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1010 /// the inner `Option`.
1011 ///
1012 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1013 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1014 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1015 ///
1016 /// # Example
1017 /// ```rust
1018 /// # #[cfg(feature = "deploy")] {
1019 /// # use hydro_lang::prelude::*;
1020 /// # use futures::StreamExt;
1021 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1022 /// let tick = process.tick();
1023 /// let singleton = tick.singleton(q!(Some(42)));
1024 /// singleton.into_optional().all_ticks()
1025 /// # }, |mut stream| async move {
1026 /// // 42
1027 /// # assert_eq!(stream.next().await.unwrap(), 42);
1028 /// # }));
1029 /// # }
1030 /// ```
1031 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1032 self.filter_map(q!(|v| v))
1033 }
1034}
1035
1036impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1037where
1038 L: Location<'a>,
1039{
1040 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1041 ///
1042 /// # Example
1043 /// ```rust
1044 /// # #[cfg(feature = "deploy")] {
1045 /// # use hydro_lang::prelude::*;
1046 /// # use futures::StreamExt;
1047 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048 /// let tick = process.tick();
1049 /// // ticks are lazy by default, forces the second tick to run
1050 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1051 ///
1052 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1053 /// let b = tick.singleton(q!(true)); // true, true
1054 /// a.and(b).all_ticks()
1055 /// # }, |mut stream| async move {
1056 /// // [true, false]
1057 /// # for w in vec![true, false] {
1058 /// # assert_eq!(stream.next().await.unwrap(), w);
1059 /// # }
1060 /// # }));
1061 /// # }
1062 /// ```
1063 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1064 where
1065 B: IsBounded,
1066 {
1067 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1068 }
1069
1070 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # #[cfg(feature = "deploy")] {
1075 /// # use hydro_lang::prelude::*;
1076 /// # use futures::StreamExt;
1077 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078 /// let tick = process.tick();
1079 /// // ticks are lazy by default, forces the second tick to run
1080 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1081 ///
1082 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1083 /// let b = tick.singleton(q!(false)); // false, false
1084 /// a.or(b).all_ticks()
1085 /// # }, |mut stream| async move {
1086 /// // [true, false]
1087 /// # for w in vec![true, false] {
1088 /// # assert_eq!(stream.next().await.unwrap(), w);
1089 /// # }
1090 /// # }));
1091 /// # }
1092 /// ```
1093 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1094 where
1095 B: IsBounded,
1096 {
1097 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1098 }
1099}
1100
1101impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1102where
1103 L: Location<'a> + NoTick,
1104{
1105 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1106 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1107 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1108 /// all snapshots of this singleton into the atomic-associated tick will observe the
1109 /// same value each tick.
1110 ///
1111 /// # Non-Determinism
1112 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1113 /// the output singleton has a non-deterministic value since the snapshot can be at an
1114 /// arbitrary point in time.
1115 pub fn snapshot_atomic(
1116 self,
1117 tick: &Tick<L>,
1118 _nondet: NonDet,
1119 ) -> Singleton<T, Tick<L>, Bounded> {
1120 Singleton::new(
1121 tick.clone(),
1122 HydroNode::Batch {
1123 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1124 metadata: tick
1125 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1126 },
1127 )
1128 }
1129
1130 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1131 /// to the value will be asynchronously propagated.
1132 pub fn end_atomic(self) -> Singleton<T, L, B> {
1133 Singleton::new(
1134 self.location.tick.l.clone(),
1135 HydroNode::EndAtomic {
1136 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1137 metadata: self
1138 .location
1139 .tick
1140 .l
1141 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1142 },
1143 )
1144 }
1145}
1146
1147impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1148where
1149 L: Location<'a>,
1150{
1151 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1152 /// will observe the same version of the value and will be executed synchronously before any
1153 /// outputs are yielded (in [`Optional::end_atomic`]).
1154 ///
1155 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1156 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1157 /// a different version).
1158 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1159 let id = self.location.flow_state().borrow_mut().next_clock_id();
1160 let out_location = Atomic {
1161 tick: Tick {
1162 id,
1163 l: self.location.clone(),
1164 },
1165 };
1166 Singleton::new(
1167 out_location.clone(),
1168 HydroNode::BeginAtomic {
1169 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1170 metadata: out_location
1171 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1172 },
1173 )
1174 }
1175
1176 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1177 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1178 /// relevant data that contributed to the snapshot at tick `t`.
1179 ///
1180 /// # Non-Determinism
1181 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1182 /// the output singleton has a non-deterministic value since the snapshot can be at an
1183 /// arbitrary point in time.
1184 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1185 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1186 Singleton::new(
1187 tick.clone(),
1188 HydroNode::Batch {
1189 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1190 metadata: tick
1191 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1192 },
1193 )
1194 }
1195
1196 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1197 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1198 ///
1199 /// # Non-Determinism
1200 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1201 /// to non-deterministic batching and arrival of inputs, the output stream is
1202 /// non-deterministic.
1203 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1204 where
1205 L: NoTick,
1206 {
1207 sliced! {
1208 let snapshot = use(self, nondet);
1209 snapshot.into_stream()
1210 }
1211 .weaken_retries()
1212 }
1213
1214 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1215 /// value taken at various points in time. Because the input singleton may be
1216 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1217 /// represent the value of the singleton given some prefix of the streams leading up to
1218 /// it.
1219 ///
1220 /// # Non-Determinism
1221 /// The output stream is non-deterministic in which elements are sampled, since this
1222 /// is controlled by a clock.
1223 pub fn sample_every(
1224 self,
1225 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1226 nondet: NonDet,
1227 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1228 where
1229 L: NoTick + NoAtomic,
1230 {
1231 let samples = self.location.source_interval(interval);
1232 sliced! {
1233 let snapshot = use(self, nondet);
1234 let sample_batch = use(samples, nondet);
1235
1236 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1237 }
1238 .weaken_retries()
1239 }
1240
1241 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1242 /// implies that `B == Bounded`.
1243 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1244 where
1245 B: IsBounded,
1246 {
1247 Singleton::new(
1248 self.location.clone(),
1249 self.ir_node.replace(HydroNode::Placeholder),
1250 )
1251 }
1252
1253 #[expect(clippy::result_large_err, reason = "internal use only")]
1254 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1255 if B::UnderlyingBound::BOUNDED {
1256 Ok(Singleton::new(
1257 self.location.clone(),
1258 self.ir_node.replace(HydroNode::Placeholder),
1259 ))
1260 } else {
1261 Err(self)
1262 }
1263 }
1264
1265 /// Clones this bounded singleton into a tick, returning a singleton that has the
1266 /// same value as the outer singleton. Because the outer singleton is bounded, this
1267 /// is deterministic because there is only a single immutable version.
1268 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1269 where
1270 B: IsBounded,
1271 T: Clone,
1272 {
1273 // TODO(shadaj): avoid printing simulator logs for this snapshot
1274 self.snapshot(
1275 tick,
1276 nondet!(/** bounded top-level singleton so deterministic */),
1277 )
1278 }
1279
1280 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1281 ///
1282 /// # Example
1283 /// ```rust
1284 /// # #[cfg(feature = "deploy")] {
1285 /// # use hydro_lang::prelude::*;
1286 /// # use futures::StreamExt;
1287 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1288 /// let tick = process.tick();
1289 /// let batch_input = process
1290 /// .source_iter(q!(vec![123, 456]))
1291 /// .batch(&tick, nondet!(/** test */));
1292 /// batch_input.clone().chain(
1293 /// batch_input.count().into_stream()
1294 /// ).all_ticks()
1295 /// # }, |mut stream| async move {
1296 /// // [123, 456, 2]
1297 /// # for w in vec![123, 456, 2] {
1298 /// # assert_eq!(stream.next().await.unwrap(), w);
1299 /// # }
1300 /// # }));
1301 /// # }
1302 /// ```
1303 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1304 where
1305 B: IsBounded,
1306 {
1307 Stream::new(
1308 self.location.clone(),
1309 HydroNode::Cast {
1310 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1311 metadata: self.location.new_node_metadata(Stream::<
1312 T,
1313 Tick<L>,
1314 Bounded,
1315 TotalOrder,
1316 ExactlyOnce,
1317 >::collection_kind()),
1318 },
1319 )
1320 }
1321
1322 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1323 /// producing a singleton of the resolved output.
1324 ///
1325 /// This is useful when the singleton contains an async computation that must
1326 /// be awaited before further processing. The future is polled to completion
1327 /// before the output value is emitted.
1328 ///
1329 /// # Example
1330 /// ```rust
1331 /// # #[cfg(feature = "deploy")] {
1332 /// # use hydro_lang::prelude::*;
1333 /// # use futures::StreamExt;
1334 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335 /// let tick = process.tick();
1336 /// let singleton = tick.singleton(q!(5));
1337 /// singleton
1338 /// .map(q!(|v| async move { v * 2 }))
1339 /// .resolve_future_blocking()
1340 /// .all_ticks()
1341 /// # }, |mut stream| async move {
1342 /// // 10
1343 /// # assert_eq!(stream.next().await.unwrap(), 10);
1344 /// # }));
1345 /// # }
1346 /// ```
1347 pub fn resolve_future_blocking(
1348 self,
1349 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1350 where
1351 T: Future,
1352 B: IsBounded,
1353 {
1354 Singleton::new(
1355 self.location.clone(),
1356 HydroNode::ResolveFuturesBlocking {
1357 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1358 metadata: self
1359 .location
1360 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1361 },
1362 )
1363 }
1364}
1365
1366impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1367where
1368 L: Location<'a>,
1369{
1370 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1371 /// which will stream the value computed in _each_ tick as a separate stream element.
1372 ///
1373 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1374 /// producing one element in the output for each tick. This is useful for batched computations,
1375 /// where the results from each tick must be combined together.
1376 ///
1377 /// # Example
1378 /// ```rust
1379 /// # #[cfg(feature = "deploy")] {
1380 /// # use hydro_lang::prelude::*;
1381 /// # use futures::StreamExt;
1382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1383 /// let tick = process.tick();
1384 /// # // ticks are lazy by default, forces the second tick to run
1385 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1386 /// # let batch_first_tick = process
1387 /// # .source_iter(q!(vec![1]))
1388 /// # .batch(&tick, nondet!(/** test */));
1389 /// # let batch_second_tick = process
1390 /// # .source_iter(q!(vec![1, 2, 3]))
1391 /// # .batch(&tick, nondet!(/** test */))
1392 /// # .defer_tick(); // appears on the second tick
1393 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1394 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1395 /// .count()
1396 /// .all_ticks()
1397 /// # }, |mut stream| async move {
1398 /// // [1, 3]
1399 /// # for w in vec![1, 3] {
1400 /// # assert_eq!(stream.next().await.unwrap(), w);
1401 /// # }
1402 /// # }));
1403 /// # }
1404 /// ```
1405 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1406 self.into_stream().all_ticks()
1407 }
1408
1409 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1410 /// which will stream the value computed in _each_ tick as a separate stream element.
1411 ///
1412 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1413 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1414 /// singleton's [`Tick`] context.
1415 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1416 self.into_stream().all_ticks_atomic()
1417 }
1418
1419 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1420 /// be asynchronously updated with the latest value of the singleton inside the tick.
1421 ///
1422 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1423 /// tick that tracks the inner value. This is useful for getting the value as of the
1424 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1425 ///
1426 /// # Example
1427 /// ```rust
1428 /// # #[cfg(feature = "deploy")] {
1429 /// # use hydro_lang::prelude::*;
1430 /// # use futures::StreamExt;
1431 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1432 /// let tick = process.tick();
1433 /// # // ticks are lazy by default, forces the second tick to run
1434 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1435 /// # let batch_first_tick = process
1436 /// # .source_iter(q!(vec![1]))
1437 /// # .batch(&tick, nondet!(/** test */));
1438 /// # let batch_second_tick = process
1439 /// # .source_iter(q!(vec![1, 2, 3]))
1440 /// # .batch(&tick, nondet!(/** test */))
1441 /// # .defer_tick(); // appears on the second tick
1442 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1443 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1444 /// .count()
1445 /// .latest()
1446 /// # .sample_eager(nondet!(/** test */))
1447 /// # }, |mut stream| async move {
1448 /// // asynchronously changes from 1 ~> 3
1449 /// # for w in vec![1, 3] {
1450 /// # assert_eq!(stream.next().await.unwrap(), w);
1451 /// # }
1452 /// # }));
1453 /// # }
1454 /// ```
1455 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1456 Singleton::new(
1457 self.location.outer().clone(),
1458 HydroNode::YieldConcat {
1459 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1460 metadata: self
1461 .location
1462 .outer()
1463 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1464 },
1465 )
1466 }
1467
1468 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1469 /// be updated with the latest value of the singleton inside the tick.
1470 ///
1471 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1472 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1473 /// singleton's [`Tick`] context.
1474 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1475 let out_location = Atomic {
1476 tick: self.location.clone(),
1477 };
1478 Singleton::new(
1479 out_location.clone(),
1480 HydroNode::YieldConcat {
1481 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1482 metadata: out_location
1483 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1484 },
1485 )
1486 }
1487}
1488
1489#[doc(hidden)]
1490/// Helper trait that determines the output collection type for [`Singleton::zip`].
1491///
1492/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1493/// [`Singleton`].
1494#[sealed::sealed]
1495pub trait ZipResult<'a, Other> {
1496 /// The output collection type.
1497 type Out;
1498 /// The type of the tupled output value.
1499 type ElementType;
1500 /// The type of the other collection's value.
1501 type OtherType;
1502 /// The location where the tupled result will be materialized.
1503 type Location: Location<'a>;
1504
1505 /// The location of the second input to the `zip`.
1506 fn other_location(other: &Other) -> Self::Location;
1507 /// The IR node of the second input to the `zip`.
1508 fn other_ir_node(other: Other) -> HydroNode;
1509
1510 /// Constructs the output live collection given an IR node containing the zip result.
1511 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1512}
1513
1514#[sealed::sealed]
1515impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1516where
1517 L: Location<'a>,
1518{
1519 type Out = Singleton<(T, U), L, B>;
1520 type ElementType = (T, U);
1521 type OtherType = U;
1522 type Location = L;
1523
1524 fn other_location(other: &Singleton<U, L, B>) -> L {
1525 other.location.clone()
1526 }
1527
1528 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1529 other.ir_node.replace(HydroNode::Placeholder)
1530 }
1531
1532 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1533 Singleton::new(
1534 location.clone(),
1535 HydroNode::Cast {
1536 inner: Box::new(ir_node),
1537 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1538 },
1539 )
1540 }
1541}
1542
1543#[sealed::sealed]
1544impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1545 for Singleton<T, L, B>
1546where
1547 L: Location<'a>,
1548{
1549 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1550 type ElementType = (T, U);
1551 type OtherType = U;
1552 type Location = L;
1553
1554 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1555 other.location.clone()
1556 }
1557
1558 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1559 other.ir_node.replace(HydroNode::Placeholder)
1560 }
1561
1562 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1563 Optional::new(location, ir_node)
1564 }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569 #[cfg(feature = "deploy")]
1570 use futures::{SinkExt, StreamExt};
1571 #[cfg(feature = "deploy")]
1572 use hydro_deploy::Deployment;
1573 #[cfg(any(feature = "deploy", feature = "sim"))]
1574 use stageleft::q;
1575
1576 #[cfg(any(feature = "deploy", feature = "sim"))]
1577 use crate::compile::builder::FlowBuilder;
1578 #[cfg(feature = "deploy")]
1579 use crate::live_collections::stream::ExactlyOnce;
1580 #[cfg(any(feature = "deploy", feature = "sim"))]
1581 use crate::location::Location;
1582 #[cfg(any(feature = "deploy", feature = "sim"))]
1583 use crate::nondet::nondet;
1584
1585 #[cfg(feature = "deploy")]
1586 #[tokio::test]
1587 async fn tick_cycle_cardinality() {
1588 let mut deployment = Deployment::new();
1589
1590 let mut flow = FlowBuilder::new();
1591 let node = flow.process::<()>();
1592 let external = flow.external::<()>();
1593
1594 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1595
1596 let node_tick = node.tick();
1597 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1598 let counts = singleton
1599 .clone()
1600 .into_stream()
1601 .count()
1602 .filter_if(
1603 input
1604 .batch(&node_tick, nondet!(/** testing */))
1605 .first()
1606 .is_some(),
1607 )
1608 .all_ticks()
1609 .send_bincode_external(&external);
1610 complete_cycle.complete_next_tick(singleton);
1611
1612 let nodes = flow
1613 .with_process(&node, deployment.Localhost())
1614 .with_external(&external, deployment.Localhost())
1615 .deploy(&mut deployment);
1616
1617 deployment.deploy().await.unwrap();
1618
1619 let mut tick_trigger = nodes.connect(input_send).await;
1620 let mut external_out = nodes.connect(counts).await;
1621
1622 deployment.start().await.unwrap();
1623
1624 tick_trigger.send(()).await.unwrap();
1625
1626 assert_eq!(external_out.next().await.unwrap(), 1);
1627
1628 tick_trigger.send(()).await.unwrap();
1629
1630 assert_eq!(external_out.next().await.unwrap(), 1);
1631 }
1632
1633 #[cfg(feature = "sim")]
1634 #[test]
1635 #[should_panic]
1636 fn sim_fold_intermediate_states() {
1637 let mut flow = FlowBuilder::new();
1638 let node = flow.process::<()>();
1639
1640 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1641 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1642
1643 let tick = node.tick();
1644 let batch = folded.snapshot(&tick, nondet!(/** test */));
1645 let out_recv = batch.all_ticks().sim_output();
1646
1647 flow.sim().exhaustive(async || {
1648 assert_eq!(out_recv.next().await.unwrap(), 10);
1649 });
1650 }
1651
1652 #[cfg(feature = "sim")]
1653 #[test]
1654 fn sim_fold_intermediate_state_count() {
1655 let mut flow = FlowBuilder::new();
1656 let node = flow.process::<()>();
1657
1658 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1659 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1660
1661 let tick = node.tick();
1662 let batch = folded.snapshot(&tick, nondet!(/** test */));
1663 let out_recv = batch.all_ticks().sim_output();
1664
1665 let instance_count = flow.sim().exhaustive(async || {
1666 let out = out_recv.collect::<Vec<_>>().await;
1667 assert_eq!(out.last(), Some(&10));
1668 });
1669
1670 assert_eq!(
1671 instance_count,
1672 16 // 2^4 possible subsets of intermediates (including initial state)
1673 )
1674 }
1675
1676 #[cfg(feature = "sim")]
1677 #[test]
1678 fn sim_fold_no_repeat_initial() {
1679 // check that we don't repeat the initial state of the fold in autonomous decisions
1680
1681 let mut flow = FlowBuilder::new();
1682 let node = flow.process::<()>();
1683
1684 let (in_port, input) = node.sim_input();
1685 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1686
1687 let tick = node.tick();
1688 let batch = folded.snapshot(&tick, nondet!(/** test */));
1689 let out_recv = batch.all_ticks().sim_output();
1690
1691 flow.sim().exhaustive(async || {
1692 assert_eq!(out_recv.next().await.unwrap(), 0);
1693
1694 in_port.send(123);
1695
1696 assert_eq!(out_recv.next().await.unwrap(), 123);
1697 });
1698 }
1699
1700 #[cfg(feature = "sim")]
1701 #[test]
1702 #[should_panic]
1703 fn sim_fold_repeats_snapshots() {
1704 // when the tick is driven by a snapshot AND something else, the snapshot can
1705 // "stutter" and repeat the same state multiple times
1706
1707 let mut flow = FlowBuilder::new();
1708 let node = flow.process::<()>();
1709
1710 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1711 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1712
1713 let tick = node.tick();
1714 let batch = source
1715 .batch(&tick, nondet!(/** test */))
1716 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1717 let out_recv = batch.all_ticks().sim_output();
1718
1719 flow.sim().exhaustive(async || {
1720 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1721 {
1722 panic!("repeated snapshot");
1723 }
1724 });
1725 }
1726
1727 #[cfg(feature = "sim")]
1728 #[test]
1729 fn sim_fold_repeats_snapshots_count() {
1730 // check the number of instances
1731 let mut flow = FlowBuilder::new();
1732 let node = flow.process::<()>();
1733
1734 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1735 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1736
1737 let tick = node.tick();
1738 let batch = source
1739 .batch(&tick, nondet!(/** test */))
1740 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1741 let out_recv = batch.all_ticks().sim_output();
1742
1743 let count = flow.sim().exhaustive(async || {
1744 let _ = out_recv.collect::<Vec<_>>().await;
1745 });
1746
1747 assert_eq!(count, 52);
1748 // don't have a combinatorial explanation for this number yet, but checked via logs
1749 }
1750
1751 #[cfg(feature = "sim")]
1752 #[test]
1753 fn sim_top_level_singleton_exhaustive() {
1754 // ensures that top-level singletons have only one snapshot
1755 let mut flow = FlowBuilder::new();
1756 let node = flow.process::<()>();
1757
1758 let singleton = node.singleton(q!(1));
1759 let tick = node.tick();
1760 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1761 let out_recv = batch.all_ticks().sim_output();
1762
1763 let count = flow.sim().exhaustive(async || {
1764 let _ = out_recv.collect::<Vec<_>>().await;
1765 });
1766
1767 assert_eq!(count, 1);
1768 }
1769
1770 #[cfg(feature = "sim")]
1771 #[test]
1772 fn sim_top_level_singleton_join_count() {
1773 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1774 // exploration
1775
1776 let mut flow = FlowBuilder::new();
1777 let node = flow.process::<()>();
1778
1779 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1780 let tick = node.tick();
1781 let batch = source_iter
1782 .batch(&tick, nondet!(/** test */))
1783 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1784 let out_recv = batch.all_ticks().sim_output();
1785
1786 let instance_count = flow.sim().exhaustive(async || {
1787 let _ = out_recv.collect::<Vec<_>>().await;
1788 });
1789
1790 assert_eq!(
1791 instance_count,
1792 16 // 2^4 ways to split up (including a possibly empty first batch)
1793 )
1794 }
1795
1796 #[cfg(feature = "sim")]
1797 #[test]
1798 fn top_level_singleton_into_stream_no_replay() {
1799 let mut flow = FlowBuilder::new();
1800 let node = flow.process::<()>();
1801
1802 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1803 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1804
1805 let out_recv = folded.into_stream().sim_output();
1806
1807 flow.sim().exhaustive(async || {
1808 out_recv.assert_yields_only([10]).await;
1809 });
1810 }
1811
1812 #[cfg(feature = "sim")]
1813 #[test]
1814 fn inside_tick_singleton_zip() {
1815 use crate::live_collections::Stream;
1816 use crate::live_collections::sliced::sliced;
1817
1818 let mut flow = FlowBuilder::new();
1819 let node = flow.process::<()>();
1820
1821 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1822 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1823
1824 let out_recv = sliced! {
1825 let v = use(folded, nondet!(/** test */));
1826 v.clone().zip(v).into_stream()
1827 }
1828 .sim_output();
1829
1830 let count = flow.sim().exhaustive(async || {
1831 let out = out_recv.collect::<Vec<_>>().await;
1832 assert_eq!(out.last(), Some(&(3, 3)));
1833 });
1834
1835 assert_eq!(count, 4);
1836 }
1837}