1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
16use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
17
18pub struct Optional<Type, Loc, Bound> {
19 pub(crate) location: Loc,
20 pub(crate) ir_node: RefCell<HydroNode>,
21
22 _phantom: PhantomData<(Type, Loc, Bound)>,
23}
24
25impl<'a, T, L, B> Optional<T, L, B>
26where
27 L: Location<'a>,
28{
29 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
30 Optional {
31 location,
32 ir_node: RefCell::new(ir_node),
33 _phantom: PhantomData,
34 }
35 }
36
37 pub fn some(singleton: Singleton<T, L, B>) -> Self {
38 Optional::new(singleton.location, singleton.ir_node.into_inner())
39 }
40
41 fn location_kind(&self) -> LocationId {
42 self.location.id()
43 }
44}
45
46impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
47where
48 L: Location<'a>,
49{
50 fn defer_tick(self) -> Self {
51 Optional::defer_tick(self)
52 }
53}
54
55impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
56where
57 L: Location<'a>,
58{
59 type Location = Tick<L>;
60
61 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
62 let location_id = location.id();
63 Optional::new(
64 location.clone(),
65 HydroNode::CycleSource {
66 ident,
67 location_kind: location_id,
68 metadata: location.new_node_metadata::<T>(),
69 },
70 )
71 }
72}
73
74impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
75where
76 L: Location<'a>,
77{
78 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
79 assert_eq!(
80 self.location.id(),
81 expected_location,
82 "locations do not match"
83 );
84 self.location
85 .flow_state()
86 .borrow_mut()
87 .leaves
88 .as_mut()
89 .expect(FLOW_USED_MESSAGE)
90 .push(HydroLeaf::CycleSink {
91 ident,
92 location_kind: self.location_kind(),
93 input: Box::new(self.ir_node.into_inner()),
94 metadata: self.location.new_node_metadata::<T>(),
95 });
96 }
97}
98
99impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
100where
101 L: Location<'a>,
102{
103 type Location = Tick<L>;
104
105 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
106 let location_id = location.id();
107 Optional::new(
108 location.clone(),
109 HydroNode::CycleSource {
110 ident,
111 location_kind: location_id,
112 metadata: location.new_node_metadata::<T>(),
113 },
114 )
115 }
116}
117
118impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
123 assert_eq!(
124 self.location.id(),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .leaves
132 .as_mut()
133 .expect(FLOW_USED_MESSAGE)
134 .push(HydroLeaf::CycleSink {
135 ident,
136 location_kind: self.location_kind(),
137 input: Box::new(self.ir_node.into_inner()),
138 metadata: self.location.new_node_metadata::<T>(),
139 });
140 }
141}
142
143impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
144where
145 L: Location<'a> + NoTick,
146{
147 type Location = L;
148
149 fn create_source(ident: syn::Ident, location: L) -> Self {
150 let location_id = location.id();
151 Optional::new(
152 location.clone(),
153 HydroNode::Persist {
154 inner: Box::new(HydroNode::CycleSource {
155 ident,
156 location_kind: location_id,
157 metadata: location.new_node_metadata::<T>(),
158 }),
159 metadata: location.new_node_metadata::<T>(),
160 },
161 )
162 }
163}
164
165impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
166where
167 L: Location<'a> + NoTick,
168{
169 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
170 assert_eq!(
171 self.location.id(),
172 expected_location,
173 "locations do not match"
174 );
175 let metadata = self.location.new_node_metadata::<T>();
176 self.location
177 .flow_state()
178 .borrow_mut()
179 .leaves
180 .as_mut()
181 .expect(FLOW_USED_MESSAGE)
182 .push(HydroLeaf::CycleSink {
183 ident,
184 location_kind: self.location_kind(),
185 input: Box::new(HydroNode::Unpersist {
186 inner: Box::new(self.ir_node.into_inner()),
187 metadata: metadata.clone(),
188 }),
189 metadata,
190 });
191 }
192}
193
194impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
195where
196 L: Location<'a>,
197{
198 fn from(singleton: Optional<T, L, Bounded>) -> Self {
199 Optional::new(singleton.location, singleton.ir_node.into_inner())
200 }
201}
202
203impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
204where
205 L: Location<'a>,
206{
207 fn from(singleton: Singleton<T, L, B>) -> Self {
208 Optional::some(singleton)
209 }
210}
211
212impl<'a, T, L, B> Clone for Optional<T, L, B>
213where
214 T: Clone,
215 L: Location<'a>,
216{
217 fn clone(&self) -> Self {
218 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
219 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
220 *self.ir_node.borrow_mut() = HydroNode::Tee {
221 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
222 metadata: self.location.new_node_metadata::<T>(),
223 };
224 }
225
226 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
227 Optional {
228 location: self.location.clone(),
229 ir_node: HydroNode::Tee {
230 inner: TeeNode(inner.0.clone()),
231 metadata: metadata.clone(),
232 }
233 .into(),
234 _phantom: PhantomData,
235 }
236 } else {
237 unreachable!()
238 }
239 }
240}
241
242impl<'a, T, L, B> Optional<T, L, B>
243where
244 L: Location<'a>,
245{
246 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
265 where
266 F: Fn(T) -> U + 'a,
267 {
268 let f = f.splice_fn1_ctx(&self.location).into();
269 Optional::new(
270 self.location.clone(),
271 HydroNode::Map {
272 f,
273 input: Box::new(self.ir_node.into_inner()),
274 metadata: self.location.new_node_metadata::<U>(),
275 },
276 )
277 }
278
279 pub fn flat_map_ordered<U, I, F>(
280 self,
281 f: impl IntoQuotedMut<'a, F, L>,
282 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
283 where
284 I: IntoIterator<Item = U>,
285 F: Fn(T) -> I + 'a,
286 {
287 let f = f.splice_fn1_ctx(&self.location).into();
288 Stream::new(
289 self.location.clone(),
290 HydroNode::FlatMap {
291 f,
292 input: Box::new(self.ir_node.into_inner()),
293 metadata: self.location.new_node_metadata::<U>(),
294 },
295 )
296 }
297
298 pub fn flat_map_unordered<U, I, F>(
299 self,
300 f: impl IntoQuotedMut<'a, F, L>,
301 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
302 where
303 I: IntoIterator<Item = U>,
304 F: Fn(T) -> I + 'a,
305 {
306 let f = f.splice_fn1_ctx(&self.location).into();
307 Stream::new(
308 self.location.clone(),
309 HydroNode::FlatMap {
310 f,
311 input: Box::new(self.ir_node.into_inner()),
312 metadata: self.location.new_node_metadata::<U>(),
313 },
314 )
315 }
316
317 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
318 where
319 T: IntoIterator<Item = U>,
320 {
321 self.flat_map_ordered(q!(|v| v))
322 }
323
324 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
325 where
326 T: IntoIterator<Item = U>,
327 {
328 self.flat_map_unordered(q!(|v| v))
329 }
330
331 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
332 where
333 F: Fn(&T) -> bool + 'a,
334 {
335 let f = f.splice_fn1_borrow_ctx(&self.location).into();
336 Optional::new(
337 self.location.clone(),
338 HydroNode::Filter {
339 f,
340 input: Box::new(self.ir_node.into_inner()),
341 metadata: self.location.new_node_metadata::<T>(),
342 },
343 )
344 }
345
346 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
347 where
348 F: Fn(T) -> Option<U> + 'a,
349 {
350 let f = f.splice_fn1_ctx(&self.location).into();
351 Optional::new(
352 self.location.clone(),
353 HydroNode::FilterMap {
354 f,
355 input: Box::new(self.ir_node.into_inner()),
356 metadata: self.location.new_node_metadata::<U>(),
357 },
358 )
359 }
360
361 pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
362 check_matching_location(&self.location, &other.location);
363
364 if L::is_top_level() {
365 Optional::new(
366 self.location.clone(),
367 HydroNode::Persist {
368 inner: Box::new(HydroNode::Chain {
369 first: Box::new(HydroNode::Unpersist {
370 inner: Box::new(self.ir_node.into_inner()),
371 metadata: self.location.new_node_metadata::<T>(),
372 }),
373 second: Box::new(HydroNode::Unpersist {
374 inner: Box::new(other.ir_node.into_inner()),
375 metadata: self.location.new_node_metadata::<T>(),
376 }),
377 metadata: self.location.new_node_metadata::<T>(),
378 }),
379 metadata: self.location.new_node_metadata::<T>(),
380 },
381 )
382 } else {
383 Optional::new(
384 self.location.clone(),
385 HydroNode::Chain {
386 first: Box::new(self.ir_node.into_inner()),
387 second: Box::new(other.ir_node.into_inner()),
388 metadata: self.location.new_node_metadata::<T>(),
389 },
390 )
391 }
392 }
393
394 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
395 where
396 O: Clone,
397 {
398 let other: Optional<O, L, B> = other.into();
399 check_matching_location(&self.location, &other.location);
400
401 if L::is_top_level() {
402 Optional::new(
403 self.location.clone(),
404 HydroNode::Persist {
405 inner: Box::new(HydroNode::CrossSingleton {
406 left: Box::new(HydroNode::Unpersist {
407 inner: Box::new(self.ir_node.into_inner()),
408 metadata: self.location.new_node_metadata::<T>(),
409 }),
410 right: Box::new(HydroNode::Unpersist {
411 inner: Box::new(other.ir_node.into_inner()),
412 metadata: self.location.new_node_metadata::<O>(),
413 }),
414 metadata: self.location.new_node_metadata::<(T, O)>(),
415 }),
416 metadata: self.location.new_node_metadata::<(T, O)>(),
417 },
418 )
419 } else {
420 Optional::new(
421 self.location.clone(),
422 HydroNode::CrossSingleton {
423 left: Box::new(self.ir_node.into_inner()),
424 right: Box::new(other.ir_node.into_inner()),
425 metadata: self.location.new_node_metadata::<(T, O)>(),
426 },
427 )
428 }
429 }
430
431 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
432 check_matching_location(&self.location, &other.location);
433
434 if L::is_top_level() {
435 Singleton::new(
436 self.location.clone(),
437 HydroNode::Persist {
438 inner: Box::new(HydroNode::Chain {
439 first: Box::new(HydroNode::Unpersist {
440 inner: Box::new(self.ir_node.into_inner()),
441 metadata: self.location.new_node_metadata::<T>(),
442 }),
443 second: Box::new(HydroNode::Unpersist {
444 inner: Box::new(other.ir_node.into_inner()),
445 metadata: self.location.new_node_metadata::<T>(),
446 }),
447 metadata: self.location.new_node_metadata::<T>(),
448 }),
449 metadata: self.location.new_node_metadata::<T>(),
450 },
451 )
452 } else {
453 Singleton::new(
454 self.location.clone(),
455 HydroNode::Chain {
456 first: Box::new(self.ir_node.into_inner()),
457 second: Box::new(other.ir_node.into_inner()),
458 metadata: self.location.new_node_metadata::<T>(),
459 },
460 )
461 }
462 }
463
464 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
465 where
466 T: Clone,
467 {
468 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
469 let core_ir = HydroNode::Persist {
470 inner: Box::new(HydroNode::Source {
471 source: HydroSource::Iter(none.into()),
472 location_kind: self.location.id().root().clone(),
473 metadata: self.location.new_node_metadata::<Option<T>>(),
474 }),
475 metadata: self.location.new_node_metadata::<Option<T>>(),
476 };
477
478 let none_singleton = if L::is_top_level() {
479 Singleton::new(
480 self.location.clone(),
481 HydroNode::Persist {
482 inner: Box::new(core_ir),
483 metadata: self.location.new_node_metadata::<Option<T>>(),
484 },
485 )
486 } else {
487 Singleton::new(self.location.clone(), core_ir)
488 };
489
490 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
491 }
492}
493
494impl<'a, T, L> Optional<T, L, Bounded>
495where
496 L: Location<'a>,
497{
498 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
499 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
500 }
501
502 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
503 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
504 }
505
506 pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
507 where
508 Singleton<U, L, Bounded>: ZipResult<
509 'a,
510 Optional<(), L, Bounded>,
511 Location = L,
512 Out = Optional<(U, ()), L, Bounded>,
513 >,
514 {
515 value.continue_if(self)
516 }
517
518 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
519 if L::is_top_level() {
520 panic!("Converting an optional to a stream is not yet supported at the top level");
521 }
522
523 Stream::new(self.location, self.ir_node.into_inner())
524 }
525}
526
527impl<'a, T, L, B> Optional<T, Atomic<L>, B>
528where
529 L: Location<'a> + NoTick,
530{
531 pub unsafe fn latest_tick(self) -> Optional<T, Tick<L>, Bounded> {
540 Optional::new(
541 self.location.clone().tick,
542 HydroNode::Unpersist {
543 inner: Box::new(self.ir_node.into_inner()),
544 metadata: self.location.new_node_metadata::<T>(),
545 },
546 )
547 }
548
549 pub fn end_atomic(self) -> Optional<T, L, B> {
550 Optional::new(self.location.tick.l, self.ir_node.into_inner())
551 }
552}
553
554impl<'a, T, L, B> Optional<T, L, B>
555where
556 L: Location<'a> + NoTick + NoAtomic,
557{
558 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
559 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
560 }
561
562 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded> {
571 unsafe { self.atomic(tick).latest_tick() }
572 }
573
574 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
582 let tick = self.location.tick();
583
584 unsafe {
585 self.latest_tick(&tick).all_ticks().weakest_retries()
587 }
588 }
589
590 pub unsafe fn sample_every(
600 self,
601 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
602 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
603 let samples = unsafe {
604 self.location.source_interval(interval)
606 };
607 let tick = self.location.tick();
608
609 unsafe {
610 self.latest_tick(&tick)
612 .continue_if(samples.tick_batch(&tick).first())
613 .all_ticks()
614 .weakest_retries()
615 }
616 }
617}
618
619impl<'a, T, L> Optional<T, Tick<L>, Bounded>
620where
621 L: Location<'a>,
622{
623 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
624 Stream::new(
625 self.location.outer().clone(),
626 HydroNode::Persist {
627 inner: Box::new(self.ir_node.into_inner()),
628 metadata: self.location.new_node_metadata::<T>(),
629 },
630 )
631 }
632
633 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
634 Stream::new(
635 Atomic {
636 tick: self.location.clone(),
637 },
638 HydroNode::Persist {
639 inner: Box::new(self.ir_node.into_inner()),
640 metadata: self.location.new_node_metadata::<T>(),
641 },
642 )
643 }
644
645 pub fn latest(self) -> Optional<T, L, Unbounded> {
646 Optional::new(
647 self.location.outer().clone(),
648 HydroNode::Persist {
649 inner: Box::new(self.ir_node.into_inner()),
650 metadata: self.location.new_node_metadata::<T>(),
651 },
652 )
653 }
654
655 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
656 Optional::new(
657 Atomic {
658 tick: self.location.clone(),
659 },
660 HydroNode::Persist {
661 inner: Box::new(self.ir_node.into_inner()),
662 metadata: self.location.new_node_metadata::<T>(),
663 },
664 )
665 }
666
667 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
668 Optional::new(
669 self.location.clone(),
670 HydroNode::DeferTick {
671 input: Box::new(self.ir_node.into_inner()),
672 metadata: self.location.new_node_metadata::<T>(),
673 },
674 )
675 }
676
677 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
678 Stream::new(
679 self.location.clone(),
680 HydroNode::Persist {
681 inner: Box::new(self.ir_node.into_inner()),
682 metadata: self.location.new_node_metadata::<T>(),
683 },
684 )
685 }
686
687 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
688 Optional::new(
689 self.location.clone(),
690 HydroNode::Delta {
691 inner: Box::new(self.ir_node.into_inner()),
692 metadata: self.location.new_node_metadata::<T>(),
693 },
694 )
695 }
696}