1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11 TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::stream::{AtLeastOnce, ExactlyOnce};
17use crate::{Bounded, Optional, Stream, TotalOrder, Unbounded};
18
19pub struct Singleton<Type, Loc, Bound> {
20 pub(crate) location: Loc,
21 pub(crate) ir_node: RefCell<HydroNode>,
22
23 _phantom: PhantomData<(Type, Loc, Bound)>,
24}
25
26impl<'a, T, L, B> Singleton<T, L, B>
27where
28 L: Location<'a>,
29{
30 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
31 Singleton {
32 location,
33 ir_node: RefCell::new(ir_node),
34 _phantom: PhantomData,
35 }
36 }
37
38 fn location_kind(&self) -> LocationId {
39 self.location.id()
40 }
41}
42
43impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
44where
45 L: Location<'a>,
46{
47 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
48 Singleton::new(singleton.location, singleton.ir_node.into_inner())
49 }
50}
51
52impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
53where
54 L: Location<'a>,
55{
56 fn defer_tick(self) -> Self {
57 Singleton::defer_tick(self)
58 }
59}
60
61impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
62where
63 L: Location<'a>,
64{
65 type Location = Tick<L>;
66
67 fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
68 let location_id = location.id();
69 Singleton::new(
70 location.clone(),
71 HydroNode::Chain {
72 first: Box::new(HydroNode::CycleSource {
73 ident,
74 location_kind: location_id,
75 metadata: location.new_node_metadata::<T>(),
76 }),
77 second: initial.ir_node.into_inner().into(),
78 metadata: location.new_node_metadata::<T>(),
79 },
80 )
81 }
82}
83
84impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
85where
86 L: Location<'a>,
87{
88 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
89 assert_eq!(
90 self.location.id(),
91 expected_location,
92 "locations do not match"
93 );
94 self.location
95 .flow_state()
96 .borrow_mut()
97 .leaves
98 .as_mut()
99 .expect(FLOW_USED_MESSAGE)
100 .push(HydroLeaf::CycleSink {
101 ident,
102 location_kind: self.location_kind(),
103 input: Box::new(self.ir_node.into_inner()),
104 metadata: self.location.new_node_metadata::<T>(),
105 });
106 }
107}
108
109impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
110where
111 L: Location<'a>,
112{
113 type Location = Tick<L>;
114
115 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
116 let location_id = location.id();
117 Singleton::new(
118 location.clone(),
119 HydroNode::CycleSource {
120 ident,
121 location_kind: location_id,
122 metadata: location.new_node_metadata::<T>(),
123 },
124 )
125 }
126}
127
128impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
129where
130 L: Location<'a>,
131{
132 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
133 assert_eq!(
134 self.location.id(),
135 expected_location,
136 "locations do not match"
137 );
138 self.location
139 .flow_state()
140 .borrow_mut()
141 .leaves
142 .as_mut()
143 .expect(FLOW_USED_MESSAGE)
144 .push(HydroLeaf::CycleSink {
145 ident,
146 location_kind: self.location_kind(),
147 input: Box::new(self.ir_node.into_inner()),
148 metadata: self.location.new_node_metadata::<T>(),
149 });
150 }
151}
152
153impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
154where
155 L: Location<'a> + NoTick,
156{
157 type Location = L;
158
159 fn create_source(ident: syn::Ident, location: L) -> Self {
160 let location_id = location.id();
161 Singleton::new(
162 location.clone(),
163 HydroNode::Persist {
164 inner: Box::new(HydroNode::CycleSource {
165 ident,
166 location_kind: location_id,
167 metadata: location.new_node_metadata::<T>(),
168 }),
169 metadata: location.new_node_metadata::<T>(),
170 },
171 )
172 }
173}
174
175impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
176where
177 L: Location<'a> + NoTick,
178{
179 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
180 assert_eq!(
181 self.location.id(),
182 expected_location,
183 "locations do not match"
184 );
185 let metadata = self.location.new_node_metadata::<T>();
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .leaves
190 .as_mut()
191 .expect(FLOW_USED_MESSAGE)
192 .push(HydroLeaf::CycleSink {
193 ident,
194 location_kind: self.location_kind(),
195 input: Box::new(HydroNode::Unpersist {
196 inner: Box::new(self.ir_node.into_inner()),
197 metadata: metadata.clone(),
198 }),
199 metadata,
200 });
201 }
202}
203
204impl<'a, T, L, B> Clone for Singleton<T, L, B>
205where
206 T: Clone,
207 L: Location<'a>,
208{
209 fn clone(&self) -> Self {
210 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 *self.ir_node.borrow_mut() = HydroNode::Tee {
213 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
214 metadata: self.location.new_node_metadata::<T>(),
215 };
216 }
217
218 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219 Singleton {
220 location: self.location.clone(),
221 ir_node: HydroNode::Tee {
222 inner: TeeNode(inner.0.clone()),
223 metadata: metadata.clone(),
224 }
225 .into(),
226 _phantom: PhantomData,
227 }
228 } else {
229 unreachable!()
230 }
231 }
232}
233
234impl<'a, T, L, B> Singleton<T, L, B>
235where
236 L: Location<'a>,
237{
238 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
239 where
240 F: Fn(T) -> U + 'a,
241 {
242 let f = f.splice_fn1_ctx(&self.location).into();
243 Singleton::new(
244 self.location.clone(),
245 HydroNode::Map {
246 f,
247 input: Box::new(self.ir_node.into_inner()),
248 metadata: self.location.new_node_metadata::<U>(),
249 },
250 )
251 }
252
253 pub fn flat_map_ordered<U, I, F>(
254 self,
255 f: impl IntoQuotedMut<'a, F, L>,
256 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
257 where
258 I: IntoIterator<Item = U>,
259 F: Fn(T) -> I + 'a,
260 {
261 let f = f.splice_fn1_ctx(&self.location).into();
262 Stream::new(
263 self.location.clone(),
264 HydroNode::FlatMap {
265 f,
266 input: Box::new(self.ir_node.into_inner()),
267 metadata: self.location.new_node_metadata::<U>(),
268 },
269 )
270 }
271
272 pub fn flat_map_unordered<U, I, F>(
273 self,
274 f: impl IntoQuotedMut<'a, F, L>,
275 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
276 where
277 I: IntoIterator<Item = U>,
278 F: Fn(T) -> I + 'a,
279 {
280 let f = f.splice_fn1_ctx(&self.location).into();
281 Stream::new(
282 self.location.clone(),
283 HydroNode::FlatMap {
284 f,
285 input: Box::new(self.ir_node.into_inner()),
286 metadata: self.location.new_node_metadata::<U>(),
287 },
288 )
289 }
290
291 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
292 where
293 F: Fn(&T) -> bool + 'a,
294 {
295 let f = f.splice_fn1_borrow_ctx(&self.location).into();
296 Optional::new(
297 self.location.clone(),
298 HydroNode::Filter {
299 f,
300 input: Box::new(self.ir_node.into_inner()),
301 metadata: self.location.new_node_metadata::<T>(),
302 },
303 )
304 }
305
306 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
307 where
308 F: Fn(T) -> Option<U> + 'a,
309 {
310 let f = f.splice_fn1_ctx(&self.location).into();
311 Optional::new(
312 self.location.clone(),
313 HydroNode::FilterMap {
314 f,
315 input: Box::new(self.ir_node.into_inner()),
316 metadata: self.location.new_node_metadata::<U>(),
317 },
318 )
319 }
320
321 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
322 where
323 Self: ZipResult<'a, O, Location = L>,
324 {
325 check_matching_location(&self.location, &Self::other_location(&other));
326
327 if L::is_top_level() {
328 let left_ir_node = self.ir_node.into_inner();
329 let left_ir_node_metadata = left_ir_node.metadata().clone();
330 let right_ir_node = Self::other_ir_node(other);
331 let right_ir_node_metadata = right_ir_node.metadata().clone();
332
333 Self::make(
334 self.location.clone(),
335 HydroNode::Persist {
336 inner: Box::new(HydroNode::CrossSingleton {
337 left: Box::new(HydroNode::Unpersist {
338 inner: Box::new(left_ir_node),
339 metadata: left_ir_node_metadata,
340 }),
341 right: Box::new(HydroNode::Unpersist {
342 inner: Box::new(right_ir_node),
343 metadata: right_ir_node_metadata,
344 }),
345 metadata: self
346 .location
347 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
348 }),
349 metadata: self
350 .location
351 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
352 },
353 )
354 } else {
355 Self::make(
356 self.location.clone(),
357 HydroNode::CrossSingleton {
358 left: Box::new(self.ir_node.into_inner()),
359 right: Box::new(Self::other_ir_node(other)),
360 metadata: self
361 .location
362 .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
363 },
364 )
365 }
366 }
367
368 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
369 where
370 Self: ZipResult<
371 'a,
372 Optional<(), L, Bounded>,
373 Location = L,
374 Out = Optional<(T, ()), L, Bounded>,
375 >,
376 {
377 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
378 }
379
380 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
381 where
382 Singleton<T, L, B>: ZipResult<
383 'a,
384 Optional<(), L, Bounded>,
385 Location = L,
386 Out = Optional<(T, ()), L, Bounded>,
387 >,
388 {
389 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
390 }
391}
392
393impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
394where
395 L: Location<'a> + NoTick,
396{
397 pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
406 Singleton::new(
407 self.location.clone().tick,
408 HydroNode::Unpersist {
409 inner: Box::new(self.ir_node.into_inner()),
410 metadata: self.location.new_node_metadata::<T>(),
411 },
412 )
413 }
414
415 pub fn end_atomic(self) -> Optional<T, L, B> {
416 Optional::new(self.location.tick.l, self.ir_node.into_inner())
417 }
418}
419
420impl<'a, T, L, B> Singleton<T, L, B>
421where
422 L: Location<'a> + NoTick + NoAtomic,
423{
424 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
425 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
426 }
427
428 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
437 where
438 L: NoTick,
439 {
440 unsafe { self.atomic(tick).latest_tick() }
441 }
442
443 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
451 let tick = self.location.tick();
452
453 unsafe {
454 self.latest_tick(&tick).all_ticks().weakest_retries()
456 }
457 }
458
459 pub unsafe fn sample_every(
469 self,
470 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
471 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
472 let samples = unsafe {
473 self.location.source_interval(interval)
475 };
476 let tick = self.location.tick();
477
478 unsafe {
479 self.latest_tick(&tick)
481 .continue_if(samples.tick_batch(&tick).first())
482 .all_ticks()
483 .weakest_retries()
484 }
485 }
486}
487
488impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
489where
490 L: Location<'a>,
491{
492 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
493 Stream::new(
494 self.location.outer().clone(),
495 HydroNode::Persist {
496 inner: Box::new(self.ir_node.into_inner()),
497 metadata: self.location.new_node_metadata::<T>(),
498 },
499 )
500 }
501
502 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
503 Stream::new(
504 Atomic {
505 tick: self.location.clone(),
506 },
507 HydroNode::Persist {
508 inner: Box::new(self.ir_node.into_inner()),
509 metadata: self.location.new_node_metadata::<T>(),
510 },
511 )
512 }
513
514 pub fn latest(self) -> Singleton<T, L, Unbounded> {
515 Singleton::new(
516 self.location.outer().clone(),
517 HydroNode::Persist {
518 inner: Box::new(self.ir_node.into_inner()),
519 metadata: self.location.new_node_metadata::<T>(),
520 },
521 )
522 }
523
524 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
525 Singleton::new(
526 Atomic {
527 tick: self.location.clone(),
528 },
529 HydroNode::Persist {
530 inner: Box::new(self.ir_node.into_inner()),
531 metadata: self.location.new_node_metadata::<T>(),
532 },
533 )
534 }
535
536 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
537 Singleton::new(
538 self.location.clone(),
539 HydroNode::DeferTick {
540 input: Box::new(self.ir_node.into_inner()),
541 metadata: self.location.new_node_metadata::<T>(),
542 },
543 )
544 }
545
546 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
547 Stream::new(
548 self.location.clone(),
549 HydroNode::Persist {
550 inner: Box::new(self.ir_node.into_inner()),
551 metadata: self.location.new_node_metadata::<T>(),
552 },
553 )
554 }
555
556 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
557 Optional::new(
558 self.location.clone(),
559 HydroNode::Delta {
560 inner: Box::new(self.ir_node.into_inner()),
561 metadata: self.location.new_node_metadata::<T>(),
562 },
563 )
564 }
565
566 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
567 Stream::new(self.location, self.ir_node.into_inner())
568 }
569}
570
571pub trait ZipResult<'a, Other> {
572 type Out;
573 type ElementType;
574 type Location;
575
576 fn other_location(other: &Other) -> Self::Location;
577 fn other_ir_node(other: Other) -> HydroNode;
578
579 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
580}
581
582impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
583where
584 U: Clone,
585 L: Location<'a>,
586{
587 type Out = Singleton<(T, U), Tick<L>, B>;
588 type ElementType = (T, U);
589 type Location = Tick<L>;
590
591 fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
592 other.location.clone()
593 }
594
595 fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
596 other.ir_node.into_inner()
597 }
598
599 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
600 Singleton::new(location, ir_node)
601 }
602}
603
604impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
605where
606 U: Clone,
607 L: Location<'a>,
608{
609 type Out = Optional<(T, U), Tick<L>, B>;
610 type ElementType = (T, U);
611 type Location = Tick<L>;
612
613 fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
614 other.location.clone()
615 }
616
617 fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
618 other.ir_node.into_inner()
619 }
620
621 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
622 Optional::new(location, ir_node)
623 }
624}