1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::boundedness::Boundedness;
10use crate::builder::FLOW_USED_MESSAGE;
11use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
12use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
13use crate::location::tick::{Atomic, NoAtomic};
14use crate::location::{LocationId, NoTick, check_matching_location};
15use crate::singleton::ZipResult;
16use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
17use crate::unsafety::NonDet;
18use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
19
20pub struct Optional<Type, Loc, Bound: Boundedness> {
21 pub(crate) location: Loc,
22 pub(crate) ir_node: RefCell<HydroNode>,
23
24 _phantom: PhantomData<(Type, Loc, Bound)>,
25}
26
27impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
28where
29 L: Location<'a>,
30{
31 fn defer_tick(self) -> Self {
32 Optional::defer_tick(self)
33 }
34}
35
36impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
37where
38 L: Location<'a>,
39{
40 type Location = Tick<L>;
41
42 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
43 Optional::new(
44 location.clone(),
45 HydroNode::CycleSource {
46 ident,
47 metadata: location.new_node_metadata::<T>(),
48 },
49 )
50 }
51}
52
53impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
58 assert_eq!(
59 self.location.id(),
60 expected_location,
61 "locations do not match"
62 );
63 self.location
64 .flow_state()
65 .borrow_mut()
66 .leaves
67 .as_mut()
68 .expect(FLOW_USED_MESSAGE)
69 .push(HydroLeaf::CycleSink {
70 ident,
71 input: Box::new(self.ir_node.into_inner()),
72 metadata: self.location.new_node_metadata::<T>(),
73 });
74 }
75}
76
77impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
78where
79 L: Location<'a>,
80{
81 type Location = Tick<L>;
82
83 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
84 Optional::new(
85 location.clone(),
86 HydroNode::CycleSource {
87 ident,
88 metadata: location.new_node_metadata::<T>(),
89 },
90 )
91 }
92}
93
94impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
95where
96 L: Location<'a>,
97{
98 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
99 assert_eq!(
100 self.location.id(),
101 expected_location,
102 "locations do not match"
103 );
104 self.location
105 .flow_state()
106 .borrow_mut()
107 .leaves
108 .as_mut()
109 .expect(FLOW_USED_MESSAGE)
110 .push(HydroLeaf::CycleSink {
111 ident,
112 input: Box::new(self.ir_node.into_inner()),
113 metadata: self.location.new_node_metadata::<T>(),
114 });
115 }
116}
117
118impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
119where
120 L: Location<'a> + NoTick,
121{
122 type Location = L;
123
124 fn create_source(ident: syn::Ident, location: L) -> Self {
125 Optional::new(
126 location.clone(),
127 HydroNode::Persist {
128 inner: Box::new(HydroNode::CycleSource {
129 ident,
130 metadata: location.new_node_metadata::<T>(),
131 }),
132 metadata: location.new_node_metadata::<T>(),
133 },
134 )
135 }
136}
137
138impl<'a, T, L, B: Boundedness> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
139where
140 L: Location<'a> + NoTick,
141{
142 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
143 assert_eq!(
144 self.location.id(),
145 expected_location,
146 "locations do not match"
147 );
148 let metadata = self.location.new_node_metadata::<T>();
149 self.location
150 .flow_state()
151 .borrow_mut()
152 .leaves
153 .as_mut()
154 .expect(FLOW_USED_MESSAGE)
155 .push(HydroLeaf::CycleSink {
156 ident,
157 input: Box::new(HydroNode::Unpersist {
158 inner: Box::new(self.ir_node.into_inner()),
159 metadata: metadata.clone(),
160 }),
161 metadata,
162 });
163 }
164}
165
166impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
167where
168 L: Location<'a>,
169{
170 fn from(singleton: Optional<T, L, Bounded>) -> Self {
171 Optional::new(singleton.location, singleton.ir_node.into_inner())
172 }
173}
174
175impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
176where
177 L: Location<'a>,
178{
179 fn from(singleton: Singleton<T, L, B>) -> Self {
180 Optional::some(singleton)
181 }
182}
183
184impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
185where
186 T: Clone,
187 L: Location<'a>,
188{
189 fn clone(&self) -> Self {
190 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
191 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
192 *self.ir_node.borrow_mut() = HydroNode::Tee {
193 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
194 metadata: self.location.new_node_metadata::<T>(),
195 };
196 }
197
198 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
199 Optional {
200 location: self.location.clone(),
201 ir_node: HydroNode::Tee {
202 inner: TeeNode(inner.0.clone()),
203 metadata: metadata.clone(),
204 }
205 .into(),
206 _phantom: PhantomData,
207 }
208 } else {
209 unreachable!()
210 }
211 }
212}
213
214impl<'a, T, L, B: Boundedness> Optional<T, L, B>
215where
216 L: Location<'a>,
217{
218 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
219 Optional {
220 location,
221 ir_node: RefCell::new(ir_node),
222 _phantom: PhantomData,
223 }
224 }
225
226 pub fn some(singleton: Singleton<T, L, B>) -> Self {
227 Optional::new(singleton.location, singleton.ir_node.into_inner())
228 }
229
230 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
249 where
250 F: Fn(T) -> U + 'a,
251 {
252 let f = f.splice_fn1_ctx(&self.location).into();
253 Optional::new(
254 self.location.clone(),
255 HydroNode::Map {
256 f,
257 input: Box::new(self.ir_node.into_inner()),
258 metadata: self.location.new_node_metadata::<U>(),
259 },
260 )
261 }
262
263 pub fn flat_map_ordered<U, I, F>(
264 self,
265 f: impl IntoQuotedMut<'a, F, L>,
266 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
267 where
268 I: IntoIterator<Item = U>,
269 F: Fn(T) -> I + 'a,
270 {
271 let f = f.splice_fn1_ctx(&self.location).into();
272 Stream::new(
273 self.location.clone(),
274 HydroNode::FlatMap {
275 f,
276 input: Box::new(self.ir_node.into_inner()),
277 metadata: self.location.new_node_metadata::<U>(),
278 },
279 )
280 }
281
282 pub fn flat_map_unordered<U, I, F>(
283 self,
284 f: impl IntoQuotedMut<'a, F, L>,
285 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
286 where
287 I: IntoIterator<Item = U>,
288 F: Fn(T) -> I + 'a,
289 {
290 let f = f.splice_fn1_ctx(&self.location).into();
291 Stream::new(
292 self.location.clone(),
293 HydroNode::FlatMap {
294 f,
295 input: Box::new(self.ir_node.into_inner()),
296 metadata: self.location.new_node_metadata::<U>(),
297 },
298 )
299 }
300
301 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
302 where
303 T: IntoIterator<Item = U>,
304 {
305 self.flat_map_ordered(q!(|v| v))
306 }
307
308 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
309 where
310 T: IntoIterator<Item = U>,
311 {
312 self.flat_map_unordered(q!(|v| v))
313 }
314
315 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
316 where
317 F: Fn(&T) -> bool + 'a,
318 {
319 let f = f.splice_fn1_borrow_ctx(&self.location).into();
320 Optional::new(
321 self.location.clone(),
322 HydroNode::Filter {
323 f,
324 input: Box::new(self.ir_node.into_inner()),
325 metadata: self.location.new_node_metadata::<T>(),
326 },
327 )
328 }
329
330 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
331 where
332 F: Fn(T) -> Option<U> + 'a,
333 {
334 let f = f.splice_fn1_ctx(&self.location).into();
335 Optional::new(
336 self.location.clone(),
337 HydroNode::FilterMap {
338 f,
339 input: Box::new(self.ir_node.into_inner()),
340 metadata: self.location.new_node_metadata::<U>(),
341 },
342 )
343 }
344
345 pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
346 check_matching_location(&self.location, &other.location);
347
348 if L::is_top_level() {
349 Optional::new(
350 self.location.clone(),
351 HydroNode::Persist {
352 inner: Box::new(HydroNode::Chain {
353 first: Box::new(HydroNode::Unpersist {
354 inner: Box::new(self.ir_node.into_inner()),
355 metadata: self.location.new_node_metadata::<T>(),
356 }),
357 second: Box::new(HydroNode::Unpersist {
358 inner: Box::new(other.ir_node.into_inner()),
359 metadata: self.location.new_node_metadata::<T>(),
360 }),
361 metadata: self.location.new_node_metadata::<T>(),
362 }),
363 metadata: self.location.new_node_metadata::<T>(),
364 },
365 )
366 } else {
367 Optional::new(
368 self.location.clone(),
369 HydroNode::Chain {
370 first: Box::new(self.ir_node.into_inner()),
371 second: Box::new(other.ir_node.into_inner()),
372 metadata: self.location.new_node_metadata::<T>(),
373 },
374 )
375 }
376 }
377
378 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
379 where
380 O: Clone,
381 {
382 let other: Optional<O, L, B> = other.into();
383 check_matching_location(&self.location, &other.location);
384
385 if L::is_top_level() {
386 Optional::new(
387 self.location.clone(),
388 HydroNode::Persist {
389 inner: Box::new(HydroNode::CrossSingleton {
390 left: Box::new(HydroNode::Unpersist {
391 inner: Box::new(self.ir_node.into_inner()),
392 metadata: self.location.new_node_metadata::<T>(),
393 }),
394 right: Box::new(HydroNode::Unpersist {
395 inner: Box::new(other.ir_node.into_inner()),
396 metadata: self.location.new_node_metadata::<O>(),
397 }),
398 metadata: self.location.new_node_metadata::<(T, O)>(),
399 }),
400 metadata: self.location.new_node_metadata::<(T, O)>(),
401 },
402 )
403 } else {
404 Optional::new(
405 self.location.clone(),
406 HydroNode::CrossSingleton {
407 left: Box::new(self.ir_node.into_inner()),
408 right: Box::new(other.ir_node.into_inner()),
409 metadata: self.location.new_node_metadata::<(T, O)>(),
410 },
411 )
412 }
413 }
414
415 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
416 check_matching_location(&self.location, &other.location);
417
418 if L::is_top_level() {
419 Singleton::new(
420 self.location.clone(),
421 HydroNode::Persist {
422 inner: Box::new(HydroNode::Chain {
423 first: Box::new(HydroNode::Unpersist {
424 inner: Box::new(self.ir_node.into_inner()),
425 metadata: self.location.new_node_metadata::<T>(),
426 }),
427 second: Box::new(HydroNode::Unpersist {
428 inner: Box::new(other.ir_node.into_inner()),
429 metadata: self.location.new_node_metadata::<T>(),
430 }),
431 metadata: self.location.new_node_metadata::<T>(),
432 }),
433 metadata: self.location.new_node_metadata::<T>(),
434 },
435 )
436 } else {
437 Singleton::new(
438 self.location.clone(),
439 HydroNode::Chain {
440 first: Box::new(self.ir_node.into_inner()),
441 second: Box::new(other.ir_node.into_inner()),
442 metadata: self.location.new_node_metadata::<T>(),
443 },
444 )
445 }
446 }
447
448 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
449 where
450 T: Clone,
451 {
452 let none: syn::Expr = parse_quote!([::std::option::Option::None]);
453 let core_ir = HydroNode::Persist {
454 inner: Box::new(HydroNode::Source {
455 source: HydroSource::Iter(none.into()),
456 metadata: self.location.root().new_node_metadata::<Option<T>>(),
457 }),
458 metadata: self.location.new_node_metadata::<Option<T>>(),
459 };
460
461 let none_singleton = if L::is_top_level() {
462 Singleton::new(
463 self.location.clone(),
464 HydroNode::Persist {
465 inner: Box::new(core_ir),
466 metadata: self.location.new_node_metadata::<Option<T>>(),
467 },
468 )
469 } else {
470 Singleton::new(self.location.clone(), core_ir)
471 };
472
473 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
474 }
475
476 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
479 {
480 let mut node = self.ir_node.borrow_mut();
481 let metadata = node.metadata_mut();
482 metadata.tag = Some(name.to_string());
483 }
484 self
485 }
486}
487
488impl<'a, T, L> Optional<T, L, Bounded>
489where
490 L: Location<'a>,
491{
492 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
493 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
494 }
495
496 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
497 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
498 }
499
500 pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
501 where
502 Singleton<U, L, Bounded>: ZipResult<
503 'a,
504 Optional<(), L, Bounded>,
505 Location = L,
506 Out = Optional<(U, ()), L, Bounded>,
507 >,
508 {
509 value.continue_if(self)
510 }
511
512 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
513 if L::is_top_level() {
514 panic!("Converting an optional to a stream is not yet supported at the top level");
515 }
516
517 Stream::new(self.location, self.ir_node.into_inner())
518 }
519}
520
521impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
522where
523 L: Location<'a> + NoTick,
524{
525 pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
534 Optional::new(
535 self.location.clone().tick,
536 HydroNode::Unpersist {
537 inner: Box::new(self.ir_node.into_inner()),
538 metadata: self.location.new_node_metadata::<T>(),
539 },
540 )
541 }
542
543 pub fn end_atomic(self) -> Optional<T, L, B> {
544 Optional::new(self.location.tick.l, self.ir_node.into_inner())
545 }
546}
547
548impl<'a, T, L, B: Boundedness> Optional<T, L, B>
549where
550 L: Location<'a> + NoTick + NoAtomic,
551{
552 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
553 Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
554 }
555
556 pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
565 self.atomic(tick).snapshot(nondet)
566 }
567
568 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
576 let tick = self.location.tick();
577 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
578 }
579
580 pub fn sample_every(
590 self,
591 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
592 nondet: NonDet,
593 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
594 let samples = self.location.source_interval(interval, nondet);
595 let tick = self.location.tick();
596
597 self.snapshot(&tick, nondet)
598 .continue_if(samples.batch(&tick, nondet).first())
599 .all_ticks()
600 .weakest_retries()
601 }
602}
603
604impl<'a, T, L> Optional<T, Tick<L>, Bounded>
605where
606 L: Location<'a>,
607{
608 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
609 Stream::new(
610 self.location.outer().clone(),
611 HydroNode::Persist {
612 inner: Box::new(self.ir_node.into_inner()),
613 metadata: self.location.new_node_metadata::<T>(),
614 },
615 )
616 }
617
618 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
619 Stream::new(
620 Atomic {
621 tick: self.location.clone(),
622 },
623 HydroNode::Persist {
624 inner: Box::new(self.ir_node.into_inner()),
625 metadata: self.location.new_node_metadata::<T>(),
626 },
627 )
628 }
629
630 pub fn latest(self) -> Optional<T, L, Unbounded> {
631 Optional::new(
632 self.location.outer().clone(),
633 HydroNode::Persist {
634 inner: Box::new(self.ir_node.into_inner()),
635 metadata: self.location.new_node_metadata::<T>(),
636 },
637 )
638 }
639
640 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
641 Optional::new(
642 Atomic {
643 tick: self.location.clone(),
644 },
645 HydroNode::Persist {
646 inner: Box::new(self.ir_node.into_inner()),
647 metadata: self.location.new_node_metadata::<T>(),
648 },
649 )
650 }
651
652 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
653 Optional::new(
654 self.location.clone(),
655 HydroNode::DeferTick {
656 input: Box::new(self.ir_node.into_inner()),
657 metadata: self.location.new_node_metadata::<T>(),
658 },
659 )
660 }
661
662 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
663 Stream::new(
664 self.location.clone(),
665 HydroNode::Persist {
666 inner: Box::new(self.ir_node.into_inner()),
667 metadata: self.location.new_node_metadata::<T>(),
668 },
669 )
670 }
671
672 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
673 Optional::new(
674 self.location.clone(),
675 HydroNode::Delta {
676 inner: Box::new(self.ir_node.into_inner()),
677 metadata: self.location.new_node_metadata::<T>(),
678 },
679 )
680 }
681}