hydro_lang/
singleton.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11    TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::{Bounded, Optional, Stream, Unbounded};
17
18pub struct Singleton<Type, Loc, Bound> {
19    pub(crate) location: Loc,
20    pub(crate) ir_node: RefCell<HydroNode>,
21
22    _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Singleton<T, L, B>
26where
27    L: Location<'a>,
28{
29    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30        Singleton {
31            location,
32            ir_node: RefCell::new(ir_node),
33            _phantom: PhantomData,
34        }
35    }
36
37    fn location_kind(&self) -> LocationId {
38        self.location.id()
39    }
40}
41
42impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
43where
44    L: Location<'a>,
45{
46    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
47        Singleton::new(singleton.location, singleton.ir_node.into_inner())
48    }
49}
50
51impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
52where
53    L: Location<'a>,
54{
55    fn defer_tick(self) -> Self {
56        Singleton::defer_tick(self)
57    }
58}
59
60impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
61where
62    L: Location<'a>,
63{
64    type Location = Tick<L>;
65
66    fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
67        let location_id = location.id();
68        Singleton::new(
69            location.clone(),
70            HydroNode::Chain {
71                first: Box::new(HydroNode::CycleSource {
72                    ident,
73                    location_kind: location_id,
74                    metadata: location.new_node_metadata::<T>(),
75                }),
76                second: initial.ir_node.into_inner().into(),
77                metadata: location.new_node_metadata::<T>(),
78            },
79        )
80    }
81}
82
83impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
84where
85    L: Location<'a>,
86{
87    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
88        assert_eq!(
89            self.location.id(),
90            expected_location,
91            "locations do not match"
92        );
93        self.location
94            .flow_state()
95            .borrow_mut()
96            .leaves
97            .as_mut()
98            .expect(FLOW_USED_MESSAGE)
99            .push(HydroLeaf::CycleSink {
100                ident,
101                location_kind: self.location_kind(),
102                input: Box::new(self.ir_node.into_inner()),
103                metadata: self.location.new_node_metadata::<T>(),
104            });
105    }
106}
107
108impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
109where
110    L: Location<'a>,
111{
112    type Location = Tick<L>;
113
114    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
115        let location_id = location.id();
116        Singleton::new(
117            location.clone(),
118            HydroNode::CycleSource {
119                ident,
120                location_kind: location_id,
121                metadata: location.new_node_metadata::<T>(),
122            },
123        )
124    }
125}
126
127impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
128where
129    L: Location<'a>,
130{
131    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
132        assert_eq!(
133            self.location.id(),
134            expected_location,
135            "locations do not match"
136        );
137        self.location
138            .flow_state()
139            .borrow_mut()
140            .leaves
141            .as_mut()
142            .expect(FLOW_USED_MESSAGE)
143            .push(HydroLeaf::CycleSink {
144                ident,
145                location_kind: self.location_kind(),
146                input: Box::new(self.ir_node.into_inner()),
147                metadata: self.location.new_node_metadata::<T>(),
148            });
149    }
150}
151
152impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    type Location = L;
157
158    fn create_source(ident: syn::Ident, location: L) -> Self {
159        let location_id = location.id();
160        Singleton::new(
161            location.clone(),
162            HydroNode::Persist {
163                inner: Box::new(HydroNode::CycleSource {
164                    ident,
165                    location_kind: location_id,
166                    metadata: location.new_node_metadata::<T>(),
167                }),
168                metadata: location.new_node_metadata::<T>(),
169            },
170        )
171    }
172}
173
174impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
175where
176    L: Location<'a> + NoTick,
177{
178    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
179        assert_eq!(
180            self.location.id(),
181            expected_location,
182            "locations do not match"
183        );
184        let metadata = self.location.new_node_metadata::<T>();
185        self.location
186            .flow_state()
187            .borrow_mut()
188            .leaves
189            .as_mut()
190            .expect(FLOW_USED_MESSAGE)
191            .push(HydroLeaf::CycleSink {
192                ident,
193                location_kind: self.location_kind(),
194                input: Box::new(HydroNode::Unpersist {
195                    inner: Box::new(self.ir_node.into_inner()),
196                    metadata: metadata.clone(),
197                }),
198                metadata,
199            });
200    }
201}
202
203impl<'a, T, L, B> Clone for Singleton<T, L, B>
204where
205    T: Clone,
206    L: Location<'a>,
207{
208    fn clone(&self) -> Self {
209        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
210            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
211            *self.ir_node.borrow_mut() = HydroNode::Tee {
212                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
213                metadata: self.location.new_node_metadata::<T>(),
214            };
215        }
216
217        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
218            Singleton {
219                location: self.location.clone(),
220                ir_node: HydroNode::Tee {
221                    inner: TeeNode(inner.0.clone()),
222                    metadata: metadata.clone(),
223                }
224                .into(),
225                _phantom: PhantomData,
226            }
227        } else {
228            unreachable!()
229        }
230    }
231}
232
233impl<'a, T, L, B> Singleton<T, L, B>
234where
235    L: Location<'a>,
236{
237    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
238    where
239        F: Fn(T) -> U + 'a,
240    {
241        let f = f.splice_fn1_ctx(&self.location).into();
242        Singleton::new(
243            self.location.clone(),
244            HydroNode::Map {
245                f,
246                input: Box::new(self.ir_node.into_inner()),
247                metadata: self.location.new_node_metadata::<U>(),
248            },
249        )
250    }
251
252    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
253    where
254        I: IntoIterator<Item = U>,
255        F: Fn(T) -> I + 'a,
256    {
257        let f = f.splice_fn1_ctx(&self.location).into();
258        Stream::new(
259            self.location.clone(),
260            HydroNode::FlatMap {
261                f,
262                input: Box::new(self.ir_node.into_inner()),
263                metadata: self.location.new_node_metadata::<U>(),
264            },
265        )
266    }
267
268    pub fn flat_map_unordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B>
269    where
270        I: IntoIterator<Item = U>,
271        F: Fn(T) -> I + 'a,
272    {
273        let f = f.splice_fn1_ctx(&self.location).into();
274        Stream::new(
275            self.location.clone(),
276            HydroNode::FlatMap {
277                f,
278                input: Box::new(self.ir_node.into_inner()),
279                metadata: self.location.new_node_metadata::<U>(),
280            },
281        )
282    }
283
284    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
285    where
286        F: Fn(&T) -> bool + 'a,
287    {
288        let f = f.splice_fn1_borrow_ctx(&self.location).into();
289        Optional::new(
290            self.location.clone(),
291            HydroNode::Filter {
292                f,
293                input: Box::new(self.ir_node.into_inner()),
294                metadata: self.location.new_node_metadata::<T>(),
295            },
296        )
297    }
298
299    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
300    where
301        F: Fn(T) -> Option<U> + 'a,
302    {
303        let f = f.splice_fn1_ctx(&self.location).into();
304        Optional::new(
305            self.location.clone(),
306            HydroNode::FilterMap {
307                f,
308                input: Box::new(self.ir_node.into_inner()),
309                metadata: self.location.new_node_metadata::<U>(),
310            },
311        )
312    }
313
314    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
315    where
316        Self: ZipResult<'a, O, Location = L>,
317    {
318        check_matching_location(&self.location, &Self::other_location(&other));
319
320        if L::is_top_level() {
321            let left_ir_node = self.ir_node.into_inner();
322            let left_ir_node_metadata = left_ir_node.metadata().clone();
323            let right_ir_node = Self::other_ir_node(other);
324            let right_ir_node_metadata = right_ir_node.metadata().clone();
325
326            Self::make(
327                self.location.clone(),
328                HydroNode::Persist {
329                    inner: Box::new(HydroNode::CrossSingleton {
330                        left: Box::new(HydroNode::Unpersist {
331                            inner: Box::new(left_ir_node),
332                            metadata: left_ir_node_metadata,
333                        }),
334                        right: Box::new(HydroNode::Unpersist {
335                            inner: Box::new(right_ir_node),
336                            metadata: right_ir_node_metadata,
337                        }),
338                        metadata: self
339                            .location
340                            .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
341                    }),
342                    metadata: self
343                        .location
344                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
345                },
346            )
347        } else {
348            Self::make(
349                self.location.clone(),
350                HydroNode::CrossSingleton {
351                    left: Box::new(self.ir_node.into_inner()),
352                    right: Box::new(Self::other_ir_node(other)),
353                    metadata: self
354                        .location
355                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
356                },
357            )
358        }
359    }
360
361    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
362    where
363        Self: ZipResult<
364                'a,
365                Optional<(), L, Bounded>,
366                Location = L,
367                Out = Optional<(T, ()), L, Bounded>,
368            >,
369    {
370        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
371    }
372
373    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
374    where
375        Singleton<T, L, B>: ZipResult<
376                'a,
377                Optional<(), L, Bounded>,
378                Location = L,
379                Out = Optional<(T, ()), L, Bounded>,
380            >,
381    {
382        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
383    }
384}
385
386impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
387where
388    L: Location<'a> + NoTick,
389{
390    /// Returns a singleton value corresponding to the latest snapshot of the singleton
391    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
392    /// at least all relevant data that contributed to the snapshot at tick `t`.
393    ///
394    /// # Safety
395    /// Because this picks a snapshot of a singleton whose value is continuously changing,
396    /// the output singleton has a non-deterministic value since the snapshot can be at an
397    /// arbitrary point in time.
398    pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
399        Singleton::new(
400            self.location.clone().tick,
401            HydroNode::Unpersist {
402                inner: Box::new(self.ir_node.into_inner()),
403                metadata: self.location.new_node_metadata::<T>(),
404            },
405        )
406    }
407
408    pub fn end_atomic(self) -> Optional<T, L, B> {
409        Optional::new(self.location.tick.l, self.ir_node.into_inner())
410    }
411}
412
413impl<'a, T, L, B> Singleton<T, L, B>
414where
415    L: Location<'a> + NoTick + NoAtomic,
416{
417    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
418        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
419    }
420
421    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
422    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
423    /// relevant data that contributed to the snapshot at tick `t`.
424    ///
425    /// # Safety
426    /// Because this picks a snapshot of a singleton whose value is continuously changing,
427    /// the output singleton has a non-deterministic value since the snapshot can be at an
428    /// arbitrary point in time.
429    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
430    where
431        L: NoTick,
432    {
433        unsafe { self.atomic(tick).latest_tick() }
434    }
435
436    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
437    /// with order corresponding to increasing prefixes of data contributing to the singleton.
438    ///
439    /// # Safety
440    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
441    /// to non-deterministic batching and arrival of inputs, the output stream is
442    /// non-deterministic.
443    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
444        let tick = self.location.tick();
445
446        unsafe {
447            // SAFETY: source of intentional non-determinism
448            self.latest_tick(&tick).all_ticks()
449        }
450    }
451
452    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
453    /// value taken at various points in time. Because the input singleton may be
454    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
455    /// represent the value of the singleton given some prefix of the streams leading up to
456    /// it.
457    ///
458    /// # Safety
459    /// The output stream is non-deterministic in which elements are sampled, since this
460    /// is controlled by a clock.
461    pub unsafe fn sample_every(
462        self,
463        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
464    ) -> Stream<T, L, Unbounded> {
465        let samples = unsafe {
466            // SAFETY: source of intentional non-determinism
467            self.location.source_interval(interval)
468        };
469        let tick = self.location.tick();
470
471        unsafe {
472            // SAFETY: source of intentional non-determinism
473            self.latest_tick(&tick)
474                .continue_if(samples.tick_batch(&tick).first())
475                .all_ticks()
476        }
477    }
478}
479
480impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
481where
482    L: Location<'a>,
483{
484    pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
485        Stream::new(
486            self.location.outer().clone(),
487            HydroNode::Persist {
488                inner: Box::new(self.ir_node.into_inner()),
489                metadata: self.location.new_node_metadata::<T>(),
490            },
491        )
492    }
493
494    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
495        Stream::new(
496            Atomic {
497                tick: self.location.clone(),
498            },
499            HydroNode::Persist {
500                inner: Box::new(self.ir_node.into_inner()),
501                metadata: self.location.new_node_metadata::<T>(),
502            },
503        )
504    }
505
506    pub fn latest(self) -> Singleton<T, L, Unbounded> {
507        Singleton::new(
508            self.location.outer().clone(),
509            HydroNode::Persist {
510                inner: Box::new(self.ir_node.into_inner()),
511                metadata: self.location.new_node_metadata::<T>(),
512            },
513        )
514    }
515
516    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
517        Singleton::new(
518            Atomic {
519                tick: self.location.clone(),
520            },
521            HydroNode::Persist {
522                inner: Box::new(self.ir_node.into_inner()),
523                metadata: self.location.new_node_metadata::<T>(),
524            },
525        )
526    }
527
528    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
529        Singleton::new(
530            self.location.clone(),
531            HydroNode::DeferTick {
532                input: Box::new(self.ir_node.into_inner()),
533                metadata: self.location.new_node_metadata::<T>(),
534            },
535        )
536    }
537
538    pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
539        Stream::new(
540            self.location.clone(),
541            HydroNode::Persist {
542                inner: Box::new(self.ir_node.into_inner()),
543                metadata: self.location.new_node_metadata::<T>(),
544            },
545        )
546    }
547
548    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
549        Optional::new(
550            self.location.clone(),
551            HydroNode::Delta {
552                inner: Box::new(self.ir_node.into_inner()),
553                metadata: self.location.new_node_metadata::<T>(),
554            },
555        )
556    }
557
558    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded> {
559        Stream::new(self.location, self.ir_node.into_inner())
560    }
561}
562
563pub trait ZipResult<'a, Other> {
564    type Out;
565    type ElementType;
566    type Location;
567
568    fn other_location(other: &Other) -> Self::Location;
569    fn other_ir_node(other: Other) -> HydroNode;
570
571    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
572}
573
574impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
575where
576    U: Clone,
577    L: Location<'a>,
578{
579    type Out = Singleton<(T, U), Tick<L>, B>;
580    type ElementType = (T, U);
581    type Location = Tick<L>;
582
583    fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
584        other.location.clone()
585    }
586
587    fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
588        other.ir_node.into_inner()
589    }
590
591    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
592        Singleton::new(location, ir_node)
593    }
594}
595
596impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
597where
598    U: Clone,
599    L: Location<'a>,
600{
601    type Out = Optional<(T, U), Tick<L>, B>;
602    type ElementType = (T, U);
603    type Location = Tick<L>;
604
605    fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
606        other.location.clone()
607    }
608
609    fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
610        other.ir_node.into_inner()
611    }
612
613    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
614        Optional::new(location, ir_node)
615    }
616}