hydro_lang/
singleton.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11    TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::stream::{AtLeastOnce, ExactlyOnce};
17use crate::{Bounded, Optional, Stream, TotalOrder, Unbounded};
18
19pub struct Singleton<Type, Loc, Bound> {
20    pub(crate) location: Loc,
21    pub(crate) ir_node: RefCell<HydroNode>,
22
23    _phantom: PhantomData<(Type, Loc, Bound)>,
24}
25
26impl<'a, T, L, B> Singleton<T, L, B>
27where
28    L: Location<'a>,
29{
30    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
31        Singleton {
32            location,
33            ir_node: RefCell::new(ir_node),
34            _phantom: PhantomData,
35        }
36    }
37
38    fn location_kind(&self) -> LocationId {
39        self.location.id()
40    }
41}
42
43impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
44where
45    L: Location<'a>,
46{
47    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
48        Singleton::new(singleton.location, singleton.ir_node.into_inner())
49    }
50}
51
52impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
53where
54    L: Location<'a>,
55{
56    fn defer_tick(self) -> Self {
57        Singleton::defer_tick(self)
58    }
59}
60
61impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
62where
63    L: Location<'a>,
64{
65    type Location = Tick<L>;
66
67    fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
68        let location_id = location.id();
69        Singleton::new(
70            location.clone(),
71            HydroNode::Chain {
72                first: Box::new(HydroNode::CycleSource {
73                    ident,
74                    location_kind: location_id,
75                    metadata: location.new_node_metadata::<T>(),
76                }),
77                second: initial.ir_node.into_inner().into(),
78                metadata: location.new_node_metadata::<T>(),
79            },
80        )
81    }
82}
83
84impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
85where
86    L: Location<'a>,
87{
88    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89        assert_eq!(
90            self.location.id(),
91            expected_location,
92            "locations do not match"
93        );
94        self.location
95            .flow_state()
96            .borrow_mut()
97            .leaves
98            .as_mut()
99            .expect(FLOW_USED_MESSAGE)
100            .push(HydroLeaf::CycleSink {
101                ident,
102                location_kind: self.location_kind(),
103                input: Box::new(self.ir_node.into_inner()),
104                metadata: self.location.new_node_metadata::<T>(),
105            });
106    }
107}
108
109impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
110where
111    L: Location<'a>,
112{
113    type Location = Tick<L>;
114
115    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
116        let location_id = location.id();
117        Singleton::new(
118            location.clone(),
119            HydroNode::CycleSource {
120                ident,
121                location_kind: location_id,
122                metadata: location.new_node_metadata::<T>(),
123            },
124        )
125    }
126}
127
128impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
129where
130    L: Location<'a>,
131{
132    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
133        assert_eq!(
134            self.location.id(),
135            expected_location,
136            "locations do not match"
137        );
138        self.location
139            .flow_state()
140            .borrow_mut()
141            .leaves
142            .as_mut()
143            .expect(FLOW_USED_MESSAGE)
144            .push(HydroLeaf::CycleSink {
145                ident,
146                location_kind: self.location_kind(),
147                input: Box::new(self.ir_node.into_inner()),
148                metadata: self.location.new_node_metadata::<T>(),
149            });
150    }
151}
152
153impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
154where
155    L: Location<'a> + NoTick,
156{
157    type Location = L;
158
159    fn create_source(ident: syn::Ident, location: L) -> Self {
160        let location_id = location.id();
161        Singleton::new(
162            location.clone(),
163            HydroNode::Persist {
164                inner: Box::new(HydroNode::CycleSource {
165                    ident,
166                    location_kind: location_id,
167                    metadata: location.new_node_metadata::<T>(),
168                }),
169                metadata: location.new_node_metadata::<T>(),
170            },
171        )
172    }
173}
174
175impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
176where
177    L: Location<'a> + NoTick,
178{
179    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
180        assert_eq!(
181            self.location.id(),
182            expected_location,
183            "locations do not match"
184        );
185        let metadata = self.location.new_node_metadata::<T>();
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .leaves
190            .as_mut()
191            .expect(FLOW_USED_MESSAGE)
192            .push(HydroLeaf::CycleSink {
193                ident,
194                location_kind: self.location_kind(),
195                input: Box::new(HydroNode::Unpersist {
196                    inner: Box::new(self.ir_node.into_inner()),
197                    metadata: metadata.clone(),
198                }),
199                metadata,
200            });
201    }
202}
203
204impl<'a, T, L, B> Clone for Singleton<T, L, B>
205where
206    T: Clone,
207    L: Location<'a>,
208{
209    fn clone(&self) -> Self {
210        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212            *self.ir_node.borrow_mut() = HydroNode::Tee {
213                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
214                metadata: self.location.new_node_metadata::<T>(),
215            };
216        }
217
218        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219            Singleton {
220                location: self.location.clone(),
221                ir_node: HydroNode::Tee {
222                    inner: TeeNode(inner.0.clone()),
223                    metadata: metadata.clone(),
224                }
225                .into(),
226                _phantom: PhantomData,
227            }
228        } else {
229            unreachable!()
230        }
231    }
232}
233
234impl<'a, T, L, B> Singleton<T, L, B>
235where
236    L: Location<'a>,
237{
238    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
239    where
240        F: Fn(T) -> U + 'a,
241    {
242        let f = f.splice_fn1_ctx(&self.location).into();
243        Singleton::new(
244            self.location.clone(),
245            HydroNode::Map {
246                f,
247                input: Box::new(self.ir_node.into_inner()),
248                metadata: self.location.new_node_metadata::<U>(),
249            },
250        )
251    }
252
253    pub fn flat_map_ordered<U, I, F>(
254        self,
255        f: impl IntoQuotedMut<'a, F, L>,
256    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
257    where
258        I: IntoIterator<Item = U>,
259        F: Fn(T) -> I + 'a,
260    {
261        let f = f.splice_fn1_ctx(&self.location).into();
262        Stream::new(
263            self.location.clone(),
264            HydroNode::FlatMap {
265                f,
266                input: Box::new(self.ir_node.into_inner()),
267                metadata: self.location.new_node_metadata::<U>(),
268            },
269        )
270    }
271
272    pub fn flat_map_unordered<U, I, F>(
273        self,
274        f: impl IntoQuotedMut<'a, F, L>,
275    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
276    where
277        I: IntoIterator<Item = U>,
278        F: Fn(T) -> I + 'a,
279    {
280        let f = f.splice_fn1_ctx(&self.location).into();
281        Stream::new(
282            self.location.clone(),
283            HydroNode::FlatMap {
284                f,
285                input: Box::new(self.ir_node.into_inner()),
286                metadata: self.location.new_node_metadata::<U>(),
287            },
288        )
289    }
290
291    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
292    where
293        F: Fn(&T) -> bool + 'a,
294    {
295        let f = f.splice_fn1_borrow_ctx(&self.location).into();
296        Optional::new(
297            self.location.clone(),
298            HydroNode::Filter {
299                f,
300                input: Box::new(self.ir_node.into_inner()),
301                metadata: self.location.new_node_metadata::<T>(),
302            },
303        )
304    }
305
306    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
307    where
308        F: Fn(T) -> Option<U> + 'a,
309    {
310        let f = f.splice_fn1_ctx(&self.location).into();
311        Optional::new(
312            self.location.clone(),
313            HydroNode::FilterMap {
314                f,
315                input: Box::new(self.ir_node.into_inner()),
316                metadata: self.location.new_node_metadata::<U>(),
317            },
318        )
319    }
320
321    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
322    where
323        Self: ZipResult<'a, O, Location = L>,
324    {
325        check_matching_location(&self.location, &Self::other_location(&other));
326
327        if L::is_top_level() {
328            let left_ir_node = self.ir_node.into_inner();
329            let left_ir_node_metadata = left_ir_node.metadata().clone();
330            let right_ir_node = Self::other_ir_node(other);
331            let right_ir_node_metadata = right_ir_node.metadata().clone();
332
333            Self::make(
334                self.location.clone(),
335                HydroNode::Persist {
336                    inner: Box::new(HydroNode::CrossSingleton {
337                        left: Box::new(HydroNode::Unpersist {
338                            inner: Box::new(left_ir_node),
339                            metadata: left_ir_node_metadata,
340                        }),
341                        right: Box::new(HydroNode::Unpersist {
342                            inner: Box::new(right_ir_node),
343                            metadata: right_ir_node_metadata,
344                        }),
345                        metadata: self
346                            .location
347                            .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
348                    }),
349                    metadata: self
350                        .location
351                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
352                },
353            )
354        } else {
355            Self::make(
356                self.location.clone(),
357                HydroNode::CrossSingleton {
358                    left: Box::new(self.ir_node.into_inner()),
359                    right: Box::new(Self::other_ir_node(other)),
360                    metadata: self
361                        .location
362                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
363                },
364            )
365        }
366    }
367
368    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
369    where
370        Self: ZipResult<
371                'a,
372                Optional<(), L, Bounded>,
373                Location = L,
374                Out = Optional<(T, ()), L, Bounded>,
375            >,
376    {
377        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
378    }
379
380    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
381    where
382        Singleton<T, L, B>: ZipResult<
383                'a,
384                Optional<(), L, Bounded>,
385                Location = L,
386                Out = Optional<(T, ()), L, Bounded>,
387            >,
388    {
389        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
390    }
391}
392
393impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
394where
395    L: Location<'a> + NoTick,
396{
397    /// Returns a singleton value corresponding to the latest snapshot of the singleton
398    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
399    /// at least all relevant data that contributed to the snapshot at tick `t`.
400    ///
401    /// # Safety
402    /// Because this picks a snapshot of a singleton whose value is continuously changing,
403    /// the output singleton has a non-deterministic value since the snapshot can be at an
404    /// arbitrary point in time.
405    pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
406        Singleton::new(
407            self.location.clone().tick,
408            HydroNode::Unpersist {
409                inner: Box::new(self.ir_node.into_inner()),
410                metadata: self.location.new_node_metadata::<T>(),
411            },
412        )
413    }
414
415    pub fn end_atomic(self) -> Optional<T, L, B> {
416        Optional::new(self.location.tick.l, self.ir_node.into_inner())
417    }
418}
419
420impl<'a, T, L, B> Singleton<T, L, B>
421where
422    L: Location<'a> + NoTick + NoAtomic,
423{
424    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
425        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
426    }
427
428    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
429    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
430    /// relevant data that contributed to the snapshot at tick `t`.
431    ///
432    /// # Safety
433    /// Because this picks a snapshot of a singleton whose value is continuously changing,
434    /// the output singleton has a non-deterministic value since the snapshot can be at an
435    /// arbitrary point in time.
436    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
437    where
438        L: NoTick,
439    {
440        unsafe { self.atomic(tick).latest_tick() }
441    }
442
443    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
444    /// with order corresponding to increasing prefixes of data contributing to the singleton.
445    ///
446    /// # Safety
447    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
448    /// to non-deterministic batching and arrival of inputs, the output stream is
449    /// non-deterministic.
450    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
451        let tick = self.location.tick();
452
453        unsafe {
454            // SAFETY: source of intentional non-determinism
455            self.latest_tick(&tick).all_ticks().weakest_retries()
456        }
457    }
458
459    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
460    /// value taken at various points in time. Because the input singleton may be
461    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
462    /// represent the value of the singleton given some prefix of the streams leading up to
463    /// it.
464    ///
465    /// # Safety
466    /// The output stream is non-deterministic in which elements are sampled, since this
467    /// is controlled by a clock.
468    pub unsafe fn sample_every(
469        self,
470        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
471    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
472        let samples = unsafe {
473            // SAFETY: source of intentional non-determinism
474            self.location.source_interval(interval)
475        };
476        let tick = self.location.tick();
477
478        unsafe {
479            // SAFETY: source of intentional non-determinism
480            self.latest_tick(&tick)
481                .continue_if(samples.tick_batch(&tick).first())
482                .all_ticks()
483                .weakest_retries()
484        }
485    }
486}
487
488impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
489where
490    L: Location<'a>,
491{
492    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
493        Stream::new(
494            self.location.outer().clone(),
495            HydroNode::Persist {
496                inner: Box::new(self.ir_node.into_inner()),
497                metadata: self.location.new_node_metadata::<T>(),
498            },
499        )
500    }
501
502    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
503        Stream::new(
504            Atomic {
505                tick: self.location.clone(),
506            },
507            HydroNode::Persist {
508                inner: Box::new(self.ir_node.into_inner()),
509                metadata: self.location.new_node_metadata::<T>(),
510            },
511        )
512    }
513
514    pub fn latest(self) -> Singleton<T, L, Unbounded> {
515        Singleton::new(
516            self.location.outer().clone(),
517            HydroNode::Persist {
518                inner: Box::new(self.ir_node.into_inner()),
519                metadata: self.location.new_node_metadata::<T>(),
520            },
521        )
522    }
523
524    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
525        Singleton::new(
526            Atomic {
527                tick: self.location.clone(),
528            },
529            HydroNode::Persist {
530                inner: Box::new(self.ir_node.into_inner()),
531                metadata: self.location.new_node_metadata::<T>(),
532            },
533        )
534    }
535
536    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
537        Singleton::new(
538            self.location.clone(),
539            HydroNode::DeferTick {
540                input: Box::new(self.ir_node.into_inner()),
541                metadata: self.location.new_node_metadata::<T>(),
542            },
543        )
544    }
545
546    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
547        Stream::new(
548            self.location.clone(),
549            HydroNode::Persist {
550                inner: Box::new(self.ir_node.into_inner()),
551                metadata: self.location.new_node_metadata::<T>(),
552            },
553        )
554    }
555
556    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
557        Optional::new(
558            self.location.clone(),
559            HydroNode::Delta {
560                inner: Box::new(self.ir_node.into_inner()),
561                metadata: self.location.new_node_metadata::<T>(),
562            },
563        )
564    }
565
566    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
567        Stream::new(self.location, self.ir_node.into_inner())
568    }
569}
570
571pub trait ZipResult<'a, Other> {
572    type Out;
573    type ElementType;
574    type Location;
575
576    fn other_location(other: &Other) -> Self::Location;
577    fn other_ir_node(other: Other) -> HydroNode;
578
579    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
580}
581
582impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
583where
584    U: Clone,
585    L: Location<'a>,
586{
587    type Out = Singleton<(T, U), Tick<L>, B>;
588    type ElementType = (T, U);
589    type Location = Tick<L>;
590
591    fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
592        other.location.clone()
593    }
594
595    fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
596        other.ir_node.into_inner()
597    }
598
599    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
600        Singleton::new(location, ir_node)
601    }
602}
603
604impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
605where
606    U: Clone,
607    L: Location<'a>,
608{
609    type Out = Optional<(T, U), Tick<L>, B>;
610    type ElementType = (T, U);
611    type Location = Tick<L>;
612
613    fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
614        other.location.clone()
615    }
616
617    fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
618        other.ir_node.into_inner()
619    }
620
621    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
622        Optional::new(location, ir_node)
623    }
624}