hydro_lang/
optional.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::NoOrder;
16use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded};
17
18pub struct Optional<T, L, B> {
19    pub(crate) location: L,
20    pub(crate) ir_node: RefCell<HydroNode>,
21
22    _phantom: PhantomData<(T, L, B)>,
23}
24
25impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
26    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
27        Optional {
28            location,
29            ir_node: RefCell::new(ir_node),
30            _phantom: PhantomData,
31        }
32    }
33
34    pub fn some(singleton: Singleton<T, L, B>) -> Self {
35        Optional::new(singleton.location, singleton.ir_node.into_inner())
36    }
37
38    fn location_kind(&self) -> LocationId {
39        self.location.id()
40    }
41}
42
43impl<'a, T, L: Location<'a>> DeferTick for Optional<T, Tick<L>, Bounded> {
44    fn defer_tick(self) -> Self {
45        Optional::defer_tick(self)
46    }
47}
48
49impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker>
50    for Optional<T, Tick<L>, Bounded>
51{
52    type Location = Tick<L>;
53
54    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
55        let location_id = location.id();
56        Optional::new(
57            location.clone(),
58            HydroNode::CycleSource {
59                ident,
60                location_kind: location_id,
61                metadata: location.new_node_metadata::<T>(),
62            },
63        )
64    }
65}
66
67impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded> {
68    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
69        assert_eq!(
70            self.location.id(),
71            expected_location,
72            "locations do not match"
73        );
74        self.location
75            .flow_state()
76            .borrow_mut()
77            .leaves
78            .as_mut()
79            .expect(FLOW_USED_MESSAGE)
80            .push(HydroLeaf::CycleSink {
81                ident,
82                location_kind: self.location_kind(),
83                input: Box::new(self.ir_node.into_inner()),
84                metadata: self.location.new_node_metadata::<T>(),
85            });
86    }
87}
88
89impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker>
90    for Optional<T, Tick<L>, Bounded>
91{
92    type Location = Tick<L>;
93
94    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
95        let location_id = location.id();
96        Optional::new(
97            location.clone(),
98            HydroNode::CycleSource {
99                ident,
100                location_kind: location_id,
101                metadata: location.new_node_metadata::<T>(),
102            },
103        )
104    }
105}
106
107impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded> {
108    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
109        assert_eq!(
110            self.location.id(),
111            expected_location,
112            "locations do not match"
113        );
114        self.location
115            .flow_state()
116            .borrow_mut()
117            .leaves
118            .as_mut()
119            .expect(FLOW_USED_MESSAGE)
120            .push(HydroLeaf::CycleSink {
121                ident,
122                location_kind: self.location_kind(),
123                input: Box::new(self.ir_node.into_inner()),
124                metadata: self.location.new_node_metadata::<T>(),
125            });
126    }
127}
128
129impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker>
130    for Optional<T, L, B>
131{
132    type Location = L;
133
134    fn create_source(ident: syn::Ident, location: L) -> Self {
135        let location_id = location.id();
136        Optional::new(
137            location.clone(),
138            HydroNode::Persist {
139                inner: Box::new(HydroNode::CycleSource {
140                    ident,
141                    location_kind: location_id,
142                    metadata: location.new_node_metadata::<T>(),
143                }),
144                metadata: location.new_node_metadata::<T>(),
145            },
146        )
147    }
148}
149
150impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B> {
151    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
152        assert_eq!(
153            self.location.id(),
154            expected_location,
155            "locations do not match"
156        );
157        let metadata = self.location.new_node_metadata::<T>();
158        self.location
159            .flow_state()
160            .borrow_mut()
161            .leaves
162            .as_mut()
163            .expect(FLOW_USED_MESSAGE)
164            .push(HydroLeaf::CycleSink {
165                ident,
166                location_kind: self.location_kind(),
167                input: Box::new(HydroNode::Unpersist {
168                    inner: Box::new(self.ir_node.into_inner()),
169                    metadata: metadata.clone(),
170                }),
171                metadata,
172            });
173    }
174}
175
176impl<'a, T, L: Location<'a>> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded> {
177    fn from(singleton: Optional<T, L, Bounded>) -> Self {
178        Optional::new(singleton.location, singleton.ir_node.into_inner())
179    }
180}
181
182impl<'a, T, L: Location<'a>, B> From<Singleton<T, L, B>> for Optional<T, L, B> {
183    fn from(singleton: Singleton<T, L, B>) -> Self {
184        Optional::some(singleton)
185    }
186}
187
188impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional<T, L, B> {
189    fn clone(&self) -> Self {
190        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192            *self.ir_node.borrow_mut() = HydroNode::Tee {
193                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194                metadata: self.location.new_node_metadata::<T>(),
195            };
196        }
197
198        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199            Optional {
200                location: self.location.clone(),
201                ir_node: HydroNode::Tee {
202                    inner: TeeNode(inner.0.clone()),
203                    metadata: metadata.clone(),
204                }
205                .into(),
206                _phantom: PhantomData,
207            }
208        } else {
209            unreachable!()
210        }
211    }
212}
213
214impl<'a, T, L: Location<'a>, B> Optional<T, L, B> {
215    /// Transforms the optional value by applying a function `f` to it,
216    /// continuously as the input is updated.
217    ///
218    /// Whenever the optional is empty, the output optional is also empty.
219    ///
220    /// # Example
221    /// ```rust
222    /// # use hydro_lang::*;
223    /// # use dfir_rs::futures::StreamExt;
224    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
225    /// let tick = process.tick();
226    /// let optional = tick.optional_first_tick(q!(1));
227    /// optional.map(q!(|v| v + 1)).all_ticks()
228    /// # }, |mut stream| async move {
229    /// // 2
230    /// # assert_eq!(stream.next().await.unwrap(), 2);
231    /// # }));
232    /// ```
233    pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B> {
234        let f = f.splice_fn1_ctx(&self.location).into();
235        Optional::new(
236            self.location.clone(),
237            HydroNode::Map {
238                f,
239                input: Box::new(self.ir_node.into_inner()),
240                metadata: self.location.new_node_metadata::<U>(),
241            },
242        )
243    }
244
245    pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
246        self,
247        f: impl IntoQuotedMut<'a, F, L>,
248    ) -> Stream<U, L, B> {
249        let f = f.splice_fn1_ctx(&self.location).into();
250        Stream::new(
251            self.location.clone(),
252            HydroNode::FlatMap {
253                f,
254                input: Box::new(self.ir_node.into_inner()),
255                metadata: self.location.new_node_metadata::<U>(),
256            },
257        )
258    }
259
260    pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
261        self,
262        f: impl IntoQuotedMut<'a, F, L>,
263    ) -> Stream<U, L, B, NoOrder> {
264        let f = f.splice_fn1_ctx(&self.location).into();
265        Stream::new(
266            self.location.clone(),
267            HydroNode::FlatMap {
268                f,
269                input: Box::new(self.ir_node.into_inner()),
270                metadata: self.location.new_node_metadata::<U>(),
271            },
272        )
273    }
274
275    pub fn flatten_ordered<U>(self) -> Stream<U, L, B>
276    where
277        T: IntoIterator<Item = U>,
278    {
279        self.flat_map_ordered(q!(|v| v))
280    }
281
282    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder>
283    where
284        T: IntoIterator<Item = U>,
285    {
286        self.flat_map_unordered(q!(|v| v))
287    }
288
289    pub fn filter<F: Fn(&T) -> bool + 'a>(
290        self,
291        f: impl IntoQuotedMut<'a, F, L>,
292    ) -> Optional<T, L, B> {
293        let f = f.splice_fn1_borrow_ctx(&self.location).into();
294        Optional::new(
295            self.location.clone(),
296            HydroNode::Filter {
297                f,
298                input: Box::new(self.ir_node.into_inner()),
299                metadata: self.location.new_node_metadata::<T>(),
300            },
301        )
302    }
303
304    pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
305        self,
306        f: impl IntoQuotedMut<'a, F, L>,
307    ) -> Optional<U, L, B> {
308        let f = f.splice_fn1_ctx(&self.location).into();
309        Optional::new(
310            self.location.clone(),
311            HydroNode::FilterMap {
312                f,
313                input: Box::new(self.ir_node.into_inner()),
314                metadata: self.location.new_node_metadata::<U>(),
315            },
316        )
317    }
318
319    pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
320        check_matching_location(&self.location, &other.location);
321
322        if L::is_top_level() {
323            Optional::new(
324                self.location.clone(),
325                HydroNode::Persist {
326                    inner: Box::new(HydroNode::Chain {
327                        first: Box::new(HydroNode::Unpersist {
328                            inner: Box::new(self.ir_node.into_inner()),
329                            metadata: self.location.new_node_metadata::<T>(),
330                        }),
331                        second: Box::new(HydroNode::Unpersist {
332                            inner: Box::new(other.ir_node.into_inner()),
333                            metadata: self.location.new_node_metadata::<T>(),
334                        }),
335                        metadata: self.location.new_node_metadata::<T>(),
336                    }),
337                    metadata: self.location.new_node_metadata::<T>(),
338                },
339            )
340        } else {
341            Optional::new(
342                self.location.clone(),
343                HydroNode::Chain {
344                    first: Box::new(self.ir_node.into_inner()),
345                    second: Box::new(other.ir_node.into_inner()),
346                    metadata: self.location.new_node_metadata::<T>(),
347                },
348            )
349        }
350    }
351
352    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
353    where
354        O: Clone,
355    {
356        let other: Optional<O, L, B> = other.into();
357        check_matching_location(&self.location, &other.location);
358
359        if L::is_top_level() {
360            Optional::new(
361                self.location.clone(),
362                HydroNode::Persist {
363                    inner: Box::new(HydroNode::CrossSingleton {
364                        left: Box::new(HydroNode::Unpersist {
365                            inner: Box::new(self.ir_node.into_inner()),
366                            metadata: self.location.new_node_metadata::<T>(),
367                        }),
368                        right: Box::new(HydroNode::Unpersist {
369                            inner: Box::new(other.ir_node.into_inner()),
370                            metadata: self.location.new_node_metadata::<O>(),
371                        }),
372                        metadata: self.location.new_node_metadata::<(T, O)>(),
373                    }),
374                    metadata: self.location.new_node_metadata::<(T, O)>(),
375                },
376            )
377        } else {
378            Optional::new(
379                self.location.clone(),
380                HydroNode::CrossSingleton {
381                    left: Box::new(self.ir_node.into_inner()),
382                    right: Box::new(other.ir_node.into_inner()),
383                    metadata: self.location.new_node_metadata::<(T, O)>(),
384                },
385            )
386        }
387    }
388
389    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
390        check_matching_location(&self.location, &other.location);
391
392        if L::is_top_level() {
393            Singleton::new(
394                self.location.clone(),
395                HydroNode::Persist {
396                    inner: Box::new(HydroNode::Chain {
397                        first: Box::new(HydroNode::Unpersist {
398                            inner: Box::new(self.ir_node.into_inner()),
399                            metadata: self.location.new_node_metadata::<T>(),
400                        }),
401                        second: Box::new(HydroNode::Unpersist {
402                            inner: Box::new(other.ir_node.into_inner()),
403                            metadata: self.location.new_node_metadata::<T>(),
404                        }),
405                        metadata: self.location.new_node_metadata::<T>(),
406                    }),
407                    metadata: self.location.new_node_metadata::<T>(),
408                },
409            )
410        } else {
411            Singleton::new(
412                self.location.clone(),
413                HydroNode::Chain {
414                    first: Box::new(self.ir_node.into_inner()),
415                    second: Box::new(other.ir_node.into_inner()),
416                    metadata: self.location.new_node_metadata::<T>(),
417                },
418            )
419        }
420    }
421
422    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
423    where
424        T: Clone,
425    {
426        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
427        let core_ir = HydroNode::Persist {
428            inner: Box::new(HydroNode::Source {
429                source: HydroSource::Iter(none.into()),
430                location_kind: self.location.id().root().clone(),
431                metadata: self.location.new_node_metadata::<Option<T>>(),
432            }),
433            metadata: self.location.new_node_metadata::<Option<T>>(),
434        };
435
436        let none_singleton = if L::is_top_level() {
437            Singleton::new(
438                self.location.clone(),
439                HydroNode::Persist {
440                    inner: Box::new(core_ir),
441                    metadata: self.location.new_node_metadata::<Option<T>>(),
442                },
443            )
444        } else {
445            Singleton::new(self.location.clone(), core_ir)
446        };
447
448        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
449    }
450}
451
452impl<'a, T, L: Location<'a>> Optional<T, L, Bounded> {
453    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
454        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
455    }
456
457    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
458        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
459    }
460
461    pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
462    where
463        Singleton<U, L, Bounded>: ZipResult<
464                'a,
465                Optional<(), L, Bounded>,
466                Location = L,
467                Out = Optional<(U, ()), L, Bounded>,
468            >,
469    {
470        value.continue_if(self)
471    }
472
473    pub fn into_stream(self) -> Stream<T, L, Bounded> {
474        if L::is_top_level() {
475            panic!("Converting an optional to a stream is not yet supported at the top level");
476        }
477
478        Stream::new(self.location, self.ir_node.into_inner())
479    }
480}
481
482impl<'a, T, L: Location<'a> + NoTick, B> Optional<T, Atomic<L>, B> {
483    /// Returns an optional value corresponding to the latest snapshot of the optional
484    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
485    /// at least all relevant data that contributed to the snapshot at tick `t`.
486    ///
487    /// # Safety
488    /// Because this picks a snapshot of a optional whose value is continuously changing,
489    /// the output optional has a non-deterministic value since the snapshot can be at an
490    /// arbitrary point in time.
491    pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
492        Optional::new(
493            self.location.clone().tick,
494            HydroNode::Unpersist {
495                inner: Box::new(self.ir_node.into_inner()),
496                metadata: self.location.new_node_metadata::<T>(),
497            },
498        )
499    }
500
501    pub fn end_atomic(self) -> Optional<T, L, B> {
502        Optional::new(self.location.tick.l, self.ir_node.into_inner())
503    }
504}
505
506impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Optional<T, L, B> {
507    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
508        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
509    }
510
511    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
512    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
513    /// relevant data that contributed to the snapshot at tick `t`.
514    ///
515    /// # Safety
516    /// Because this picks a snapshot of a optional whose value is continuously changing,
517    /// the output optional has a non-deterministic value since the snapshot can be at an
518    /// arbitrary point in time.
519    pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
520        unsafe { self.atomic(tick).latest_tick() }
521    }
522
523    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
524    /// with order corresponding to increasing prefixes of data contributing to the optional.
525    ///
526    /// # Safety
527    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
528    /// to non-deterministic batching and arrival of inputs, the output stream is
529    /// non-deterministic.
530    pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
531        let tick = self.location.tick();
532
533        unsafe {
534            // SAFETY: source of intentional non-determinism
535            self.latest_tick(&tick).all_ticks()
536        }
537    }
538
539    /// Given a time interval, returns a stream corresponding to snapshots of the optional
540    /// value taken at various points in time. Because the input optional may be
541    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
542    /// represent the value of the optional given some prefix of the streams leading up to
543    /// it.
544    ///
545    /// # Safety
546    /// The output stream is non-deterministic in which elements are sampled, since this
547    /// is controlled by a clock.
548    pub unsafe fn sample_every(
549        self,
550        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
551    ) -> Stream<T, L, Unbounded>
552    where
553        L: NoAtomic,
554    {
555        let samples = unsafe {
556            // SAFETY: source of intentional non-determinism
557            self.location.source_interval(interval)
558        };
559        let tick = self.location.tick();
560
561        unsafe {
562            // SAFETY: source of intentional non-determinism
563            self.latest_tick(&tick)
564                .continue_if(samples.tick_batch(&tick).first())
565                .all_ticks()
566        }
567    }
568}
569
570impl<'a, T, L: Location<'a>> Optional<T, Tick<L>, Bounded> {
571    pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
572        Stream::new(
573            self.location.outer().clone(),
574            HydroNode::Persist {
575                inner: Box::new(self.ir_node.into_inner()),
576                metadata: self.location.new_node_metadata::<T>(),
577            },
578        )
579    }
580
581    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
582        Stream::new(
583            Atomic {
584                tick: self.location.clone(),
585            },
586            HydroNode::Persist {
587                inner: Box::new(self.ir_node.into_inner()),
588                metadata: self.location.new_node_metadata::<T>(),
589            },
590        )
591    }
592
593    pub fn latest(self) -> Optional<T, L, Unbounded> {
594        Optional::new(
595            self.location.outer().clone(),
596            HydroNode::Persist {
597                inner: Box::new(self.ir_node.into_inner()),
598                metadata: self.location.new_node_metadata::<T>(),
599            },
600        )
601    }
602
603    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
604        Optional::new(
605            Atomic {
606                tick: self.location.clone(),
607            },
608            HydroNode::Persist {
609                inner: Box::new(self.ir_node.into_inner()),
610                metadata: self.location.new_node_metadata::<T>(),
611            },
612        )
613    }
614
615    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
616        Optional::new(
617            self.location.clone(),
618            HydroNode::DeferTick {
619                input: Box::new(self.ir_node.into_inner()),
620                metadata: self.location.new_node_metadata::<T>(),
621            },
622        )
623    }
624
625    pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
626        Stream::new(
627            self.location.clone(),
628            HydroNode::Persist {
629                inner: Box::new(self.ir_node.into_inner()),
630                metadata: self.location.new_node_metadata::<T>(),
631            },
632        )
633    }
634
635    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
636        Optional::new(
637            self.location.clone(),
638            HydroNode::Delta {
639                inner: Box::new(self.ir_node.into_inner()),
640                metadata: self.location.new_node_metadata::<T>(),
641            },
642        )
643    }
644}