hydro_lang/
optional.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
16use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
17
18pub struct Optional<Type, Loc, Bound> {
19    pub(crate) location: Loc,
20    pub(crate) ir_node: RefCell<HydroNode>,
21
22    _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Optional<T, L, B>
26where
27    L: Location<'a>,
28{
29    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30        Optional {
31            location,
32            ir_node: RefCell::new(ir_node),
33            _phantom: PhantomData,
34        }
35    }
36
37    pub fn some(singleton: Singleton<T, L, B>) -> Self {
38        Optional::new(singleton.location, singleton.ir_node.into_inner())
39    }
40
41    fn location_kind(&self) -> LocationId {
42        self.location.id()
43    }
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48    L: Location<'a>,
49{
50    fn defer_tick(self) -> Self {
51        Optional::defer_tick(self)
52    }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
56where
57    L: Location<'a>,
58{
59    type Location = Tick<L>;
60
61    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62        let location_id = location.id();
63        Optional::new(
64            location.clone(),
65            HydroNode::CycleSource {
66                ident,
67                location_kind: location_id,
68                metadata: location.new_node_metadata::<T>(),
69            },
70        )
71    }
72}
73
74impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
75where
76    L: Location<'a>,
77{
78    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
79        assert_eq!(
80            self.location.id(),
81            expected_location,
82            "locations do not match"
83        );
84        self.location
85            .flow_state()
86            .borrow_mut()
87            .leaves
88            .as_mut()
89            .expect(FLOW_USED_MESSAGE)
90            .push(HydroLeaf::CycleSink {
91                ident,
92                location_kind: self.location_kind(),
93                input: Box::new(self.ir_node.into_inner()),
94                metadata: self.location.new_node_metadata::<T>(),
95            });
96    }
97}
98
99impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
100where
101    L: Location<'a>,
102{
103    type Location = Tick<L>;
104
105    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
106        let location_id = location.id();
107        Optional::new(
108            location.clone(),
109            HydroNode::CycleSource {
110                ident,
111                location_kind: location_id,
112                metadata: location.new_node_metadata::<T>(),
113            },
114        )
115    }
116}
117
118impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
123        assert_eq!(
124            self.location.id(),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .leaves
132            .as_mut()
133            .expect(FLOW_USED_MESSAGE)
134            .push(HydroLeaf::CycleSink {
135                ident,
136                location_kind: self.location_kind(),
137                input: Box::new(self.ir_node.into_inner()),
138                metadata: self.location.new_node_metadata::<T>(),
139            });
140    }
141}
142
143impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
144where
145    L: Location<'a> + NoTick,
146{
147    type Location = L;
148
149    fn create_source(ident: syn::Ident, location: L) -> Self {
150        let location_id = location.id();
151        Optional::new(
152            location.clone(),
153            HydroNode::Persist {
154                inner: Box::new(HydroNode::CycleSource {
155                    ident,
156                    location_kind: location_id,
157                    metadata: location.new_node_metadata::<T>(),
158                }),
159                metadata: location.new_node_metadata::<T>(),
160            },
161        )
162    }
163}
164
165impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
166where
167    L: Location<'a> + NoTick,
168{
169    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170        assert_eq!(
171            self.location.id(),
172            expected_location,
173            "locations do not match"
174        );
175        let metadata = self.location.new_node_metadata::<T>();
176        self.location
177            .flow_state()
178            .borrow_mut()
179            .leaves
180            .as_mut()
181            .expect(FLOW_USED_MESSAGE)
182            .push(HydroLeaf::CycleSink {
183                ident,
184                location_kind: self.location_kind(),
185                input: Box::new(HydroNode::Unpersist {
186                    inner: Box::new(self.ir_node.into_inner()),
187                    metadata: metadata.clone(),
188                }),
189                metadata,
190            });
191    }
192}
193
194impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
195where
196    L: Location<'a>,
197{
198    fn from(singleton: Optional<T, L, Bounded>) -> Self {
199        Optional::new(singleton.location, singleton.ir_node.into_inner())
200    }
201}
202
203impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
204where
205    L: Location<'a>,
206{
207    fn from(singleton: Singleton<T, L, B>) -> Self {
208        Optional::some(singleton)
209    }
210}
211
212impl<'a, T, L, B> Clone for Optional<T, L, B>
213where
214    T: Clone,
215    L: Location<'a>,
216{
217    fn clone(&self) -> Self {
218        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
219            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
220            *self.ir_node.borrow_mut() = HydroNode::Tee {
221                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
222                metadata: self.location.new_node_metadata::<T>(),
223            };
224        }
225
226        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
227            Optional {
228                location: self.location.clone(),
229                ir_node: HydroNode::Tee {
230                    inner: TeeNode(inner.0.clone()),
231                    metadata: metadata.clone(),
232                }
233                .into(),
234                _phantom: PhantomData,
235            }
236        } else {
237            unreachable!()
238        }
239    }
240}
241
242impl<'a, T, L, B> Optional<T, L, B>
243where
244    L: Location<'a>,
245{
246    /// Transforms the optional value by applying a function `f` to it,
247    /// continuously as the input is updated.
248    ///
249    /// Whenever the optional is empty, the output optional is also empty.
250    ///
251    /// # Example
252    /// ```rust
253    /// # use hydro_lang::*;
254    /// # use futures::StreamExt;
255    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
256    /// let tick = process.tick();
257    /// let optional = tick.optional_first_tick(q!(1));
258    /// optional.map(q!(|v| v + 1)).all_ticks()
259    /// # }, |mut stream| async move {
260    /// // 2
261    /// # assert_eq!(stream.next().await.unwrap(), 2);
262    /// # }));
263    /// ```
264    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
265    where
266        F: Fn(T) -> U + 'a,
267    {
268        let f = f.splice_fn1_ctx(&self.location).into();
269        Optional::new(
270            self.location.clone(),
271            HydroNode::Map {
272                f,
273                input: Box::new(self.ir_node.into_inner()),
274                metadata: self.location.new_node_metadata::<U>(),
275            },
276        )
277    }
278
279    pub fn flat_map_ordered<U, I, F>(
280        self,
281        f: impl IntoQuotedMut<'a, F, L>,
282    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
283    where
284        I: IntoIterator<Item = U>,
285        F: Fn(T) -> I + 'a,
286    {
287        let f = f.splice_fn1_ctx(&self.location).into();
288        Stream::new(
289            self.location.clone(),
290            HydroNode::FlatMap {
291                f,
292                input: Box::new(self.ir_node.into_inner()),
293                metadata: self.location.new_node_metadata::<U>(),
294            },
295        )
296    }
297
298    pub fn flat_map_unordered<U, I, F>(
299        self,
300        f: impl IntoQuotedMut<'a, F, L>,
301    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
302    where
303        I: IntoIterator<Item = U>,
304        F: Fn(T) -> I + 'a,
305    {
306        let f = f.splice_fn1_ctx(&self.location).into();
307        Stream::new(
308            self.location.clone(),
309            HydroNode::FlatMap {
310                f,
311                input: Box::new(self.ir_node.into_inner()),
312                metadata: self.location.new_node_metadata::<U>(),
313            },
314        )
315    }
316
317    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
318    where
319        T: IntoIterator<Item = U>,
320    {
321        self.flat_map_ordered(q!(|v| v))
322    }
323
324    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
325    where
326        T: IntoIterator<Item = U>,
327    {
328        self.flat_map_unordered(q!(|v| v))
329    }
330
331    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
332    where
333        F: Fn(&T) -> bool + 'a,
334    {
335        let f = f.splice_fn1_borrow_ctx(&self.location).into();
336        Optional::new(
337            self.location.clone(),
338            HydroNode::Filter {
339                f,
340                input: Box::new(self.ir_node.into_inner()),
341                metadata: self.location.new_node_metadata::<T>(),
342            },
343        )
344    }
345
346    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
347    where
348        F: Fn(T) -> Option<U> + 'a,
349    {
350        let f = f.splice_fn1_ctx(&self.location).into();
351        Optional::new(
352            self.location.clone(),
353            HydroNode::FilterMap {
354                f,
355                input: Box::new(self.ir_node.into_inner()),
356                metadata: self.location.new_node_metadata::<U>(),
357            },
358        )
359    }
360
361    pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
362        check_matching_location(&self.location, &other.location);
363
364        if L::is_top_level() {
365            Optional::new(
366                self.location.clone(),
367                HydroNode::Persist {
368                    inner: Box::new(HydroNode::Chain {
369                        first: Box::new(HydroNode::Unpersist {
370                            inner: Box::new(self.ir_node.into_inner()),
371                            metadata: self.location.new_node_metadata::<T>(),
372                        }),
373                        second: Box::new(HydroNode::Unpersist {
374                            inner: Box::new(other.ir_node.into_inner()),
375                            metadata: self.location.new_node_metadata::<T>(),
376                        }),
377                        metadata: self.location.new_node_metadata::<T>(),
378                    }),
379                    metadata: self.location.new_node_metadata::<T>(),
380                },
381            )
382        } else {
383            Optional::new(
384                self.location.clone(),
385                HydroNode::Chain {
386                    first: Box::new(self.ir_node.into_inner()),
387                    second: Box::new(other.ir_node.into_inner()),
388                    metadata: self.location.new_node_metadata::<T>(),
389                },
390            )
391        }
392    }
393
394    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
395    where
396        O: Clone,
397    {
398        let other: Optional<O, L, B> = other.into();
399        check_matching_location(&self.location, &other.location);
400
401        if L::is_top_level() {
402            Optional::new(
403                self.location.clone(),
404                HydroNode::Persist {
405                    inner: Box::new(HydroNode::CrossSingleton {
406                        left: Box::new(HydroNode::Unpersist {
407                            inner: Box::new(self.ir_node.into_inner()),
408                            metadata: self.location.new_node_metadata::<T>(),
409                        }),
410                        right: Box::new(HydroNode::Unpersist {
411                            inner: Box::new(other.ir_node.into_inner()),
412                            metadata: self.location.new_node_metadata::<O>(),
413                        }),
414                        metadata: self.location.new_node_metadata::<(T, O)>(),
415                    }),
416                    metadata: self.location.new_node_metadata::<(T, O)>(),
417                },
418            )
419        } else {
420            Optional::new(
421                self.location.clone(),
422                HydroNode::CrossSingleton {
423                    left: Box::new(self.ir_node.into_inner()),
424                    right: Box::new(other.ir_node.into_inner()),
425                    metadata: self.location.new_node_metadata::<(T, O)>(),
426                },
427            )
428        }
429    }
430
431    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
432        check_matching_location(&self.location, &other.location);
433
434        if L::is_top_level() {
435            Singleton::new(
436                self.location.clone(),
437                HydroNode::Persist {
438                    inner: Box::new(HydroNode::Chain {
439                        first: Box::new(HydroNode::Unpersist {
440                            inner: Box::new(self.ir_node.into_inner()),
441                            metadata: self.location.new_node_metadata::<T>(),
442                        }),
443                        second: Box::new(HydroNode::Unpersist {
444                            inner: Box::new(other.ir_node.into_inner()),
445                            metadata: self.location.new_node_metadata::<T>(),
446                        }),
447                        metadata: self.location.new_node_metadata::<T>(),
448                    }),
449                    metadata: self.location.new_node_metadata::<T>(),
450                },
451            )
452        } else {
453            Singleton::new(
454                self.location.clone(),
455                HydroNode::Chain {
456                    first: Box::new(self.ir_node.into_inner()),
457                    second: Box::new(other.ir_node.into_inner()),
458                    metadata: self.location.new_node_metadata::<T>(),
459                },
460            )
461        }
462    }
463
464    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
465    where
466        T: Clone,
467    {
468        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
469        let core_ir = HydroNode::Persist {
470            inner: Box::new(HydroNode::Source {
471                source: HydroSource::Iter(none.into()),
472                location_kind: self.location.id().root().clone(),
473                metadata: self.location.new_node_metadata::<Option<T>>(),
474            }),
475            metadata: self.location.new_node_metadata::<Option<T>>(),
476        };
477
478        let none_singleton = if L::is_top_level() {
479            Singleton::new(
480                self.location.clone(),
481                HydroNode::Persist {
482                    inner: Box::new(core_ir),
483                    metadata: self.location.new_node_metadata::<Option<T>>(),
484                },
485            )
486        } else {
487            Singleton::new(self.location.clone(), core_ir)
488        };
489
490        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
491    }
492}
493
494impl<'a, T, L> Optional<T, L, Bounded>
495where
496    L: Location<'a>,
497{
498    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
499        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
500    }
501
502    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
503        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
504    }
505
506    pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
507    where
508        Singleton<U, L, Bounded>: ZipResult<
509                'a,
510                Optional<(), L, Bounded>,
511                Location = L,
512                Out = Optional<(U, ()), L, Bounded>,
513            >,
514    {
515        value.continue_if(self)
516    }
517
518    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
519        if L::is_top_level() {
520            panic!("Converting an optional to a stream is not yet supported at the top level");
521        }
522
523        Stream::new(self.location, self.ir_node.into_inner())
524    }
525}
526
527impl<'a, T, L, B> Optional<T, Atomic<L>, B>
528where
529    L: Location<'a> + NoTick,
530{
531    /// Returns an optional value corresponding to the latest snapshot of the optional
532    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
533    /// at least all relevant data that contributed to the snapshot at tick `t`.
534    ///
535    /// # Safety
536    /// Because this picks a snapshot of a optional whose value is continuously changing,
537    /// the output optional has a non-deterministic value since the snapshot can be at an
538    /// arbitrary point in time.
539    pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
540        Optional::new(
541            self.location.clone().tick,
542            HydroNode::Unpersist {
543                inner: Box::new(self.ir_node.into_inner()),
544                metadata: self.location.new_node_metadata::<T>(),
545            },
546        )
547    }
548
549    pub fn end_atomic(self) -> Optional<T, L, B> {
550        Optional::new(self.location.tick.l, self.ir_node.into_inner())
551    }
552}
553
554impl<'a, T, L, B> Optional<T, L, B>
555where
556    L: Location<'a> + NoTick + NoAtomic,
557{
558    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
559        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
560    }
561
562    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
563    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
564    /// relevant data that contributed to the snapshot at tick `t`.
565    ///
566    /// # Safety
567    /// Because this picks a snapshot of a optional whose value is continuously changing,
568    /// the output optional has a non-deterministic value since the snapshot can be at an
569    /// arbitrary point in time.
570    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
571        unsafe { self.atomic(tick).latest_tick() }
572    }
573
574    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
575    /// with order corresponding to increasing prefixes of data contributing to the optional.
576    ///
577    /// # Safety
578    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
579    /// to non-deterministic batching and arrival of inputs, the output stream is
580    /// non-deterministic.
581    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
582        let tick = self.location.tick();
583
584        unsafe {
585            // SAFETY: source of intentional non-determinism
586            self.latest_tick(&tick).all_ticks().weakest_retries()
587        }
588    }
589
590    /// Given a time interval, returns a stream corresponding to snapshots of the optional
591    /// value taken at various points in time. Because the input optional may be
592    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
593    /// represent the value of the optional given some prefix of the streams leading up to
594    /// it.
595    ///
596    /// # Safety
597    /// The output stream is non-deterministic in which elements are sampled, since this
598    /// is controlled by a clock.
599    pub unsafe fn sample_every(
600        self,
601        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
602    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
603        let samples = unsafe {
604            // SAFETY: source of intentional non-determinism
605            self.location.source_interval(interval)
606        };
607        let tick = self.location.tick();
608
609        unsafe {
610            // SAFETY: source of intentional non-determinism
611            self.latest_tick(&tick)
612                .continue_if(samples.tick_batch(&tick).first())
613                .all_ticks()
614                .weakest_retries()
615        }
616    }
617}
618
619impl<'a, T, L> Optional<T, Tick<L>, Bounded>
620where
621    L: Location<'a>,
622{
623    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
624        Stream::new(
625            self.location.outer().clone(),
626            HydroNode::Persist {
627                inner: Box::new(self.ir_node.into_inner()),
628                metadata: self.location.new_node_metadata::<T>(),
629            },
630        )
631    }
632
633    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
634        Stream::new(
635            Atomic {
636                tick: self.location.clone(),
637            },
638            HydroNode::Persist {
639                inner: Box::new(self.ir_node.into_inner()),
640                metadata: self.location.new_node_metadata::<T>(),
641            },
642        )
643    }
644
645    pub fn latest(self) -> Optional<T, L, Unbounded> {
646        Optional::new(
647            self.location.outer().clone(),
648            HydroNode::Persist {
649                inner: Box::new(self.ir_node.into_inner()),
650                metadata: self.location.new_node_metadata::<T>(),
651            },
652        )
653    }
654
655    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
656        Optional::new(
657            Atomic {
658                tick: self.location.clone(),
659            },
660            HydroNode::Persist {
661                inner: Box::new(self.ir_node.into_inner()),
662                metadata: self.location.new_node_metadata::<T>(),
663            },
664        )
665    }
666
667    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
668        Optional::new(
669            self.location.clone(),
670            HydroNode::DeferTick {
671                input: Box::new(self.ir_node.into_inner()),
672                metadata: self.location.new_node_metadata::<T>(),
673            },
674        )
675    }
676
677    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
678        Stream::new(
679            self.location.clone(),
680            HydroNode::Persist {
681                inner: Box::new(self.ir_node.into_inner()),
682                metadata: self.location.new_node_metadata::<T>(),
683            },
684        )
685    }
686
687    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
688        Optional::new(
689            self.location.clone(),
690            HydroNode::Delta {
691                inner: Box::new(self.ir_node.into_inner()),
692                metadata: self.location.new_node_metadata::<T>(),
693            },
694        )
695    }
696}