1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10 CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11 TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::{Bounded, Optional, Stream, Unbounded};
17
18pub struct Singleton<T, L, B> {
19 pub(crate) location: L,
20 pub(crate) ir_node: RefCell<HydroNode>,
21
22 _phantom: PhantomData<(T, L, B)>,
23}
24
25impl<'a, T, L: Location<'a>, B> Singleton<T, L, B> {
26 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
27 Singleton {
28 location,
29 ir_node: RefCell::new(ir_node),
30 _phantom: PhantomData,
31 }
32 }
33
34 fn location_kind(&self) -> LocationId {
35 self.location.id()
36 }
37}
38
39impl<'a, T, L: Location<'a>> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded> {
40 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
41 Singleton::new(singleton.location, singleton.ir_node.into_inner())
42 }
43}
44
45impl<'a, T, L: Location<'a>> DeferTick for Singleton<T, Tick<L>, Bounded> {
46 fn defer_tick(self) -> Self {
47 Singleton::defer_tick(self)
48 }
49}
50
51impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker>
52 for Singleton<T, Tick<L>, Bounded>
53{
54 type Location = Tick<L>;
55
56 fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
57 let location_id = location.id();
58 Singleton::new(
59 location.clone(),
60 HydroNode::Chain {
61 first: Box::new(HydroNode::CycleSource {
62 ident,
63 location_kind: location_id,
64 metadata: location.new_node_metadata::<T>(),
65 }),
66 second: initial.ir_node.into_inner().into(),
67 metadata: location.new_node_metadata::<T>(),
68 },
69 )
70 }
71}
72
73impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded> {
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 self.location.id(),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .leaves
84 .as_mut()
85 .expect(FLOW_USED_MESSAGE)
86 .push(HydroLeaf::CycleSink {
87 ident,
88 location_kind: self.location_kind(),
89 input: Box::new(self.ir_node.into_inner()),
90 metadata: self.location.new_node_metadata::<T>(),
91 });
92 }
93}
94
95impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker>
96 for Singleton<T, Tick<L>, Bounded>
97{
98 type Location = Tick<L>;
99
100 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
101 let location_id = location.id();
102 Singleton::new(
103 location.clone(),
104 HydroNode::CycleSource {
105 ident,
106 location_kind: location_id,
107 metadata: location.new_node_metadata::<T>(),
108 },
109 )
110 }
111}
112
113impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker>
114 for Singleton<T, Tick<L>, Bounded>
115{
116 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
117 assert_eq!(
118 self.location.id(),
119 expected_location,
120 "locations do not match"
121 );
122 self.location
123 .flow_state()
124 .borrow_mut()
125 .leaves
126 .as_mut()
127 .expect(FLOW_USED_MESSAGE)
128 .push(HydroLeaf::CycleSink {
129 ident,
130 location_kind: self.location_kind(),
131 input: Box::new(self.ir_node.into_inner()),
132 metadata: self.location.new_node_metadata::<T>(),
133 });
134 }
135}
136
137impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker>
138 for Singleton<T, L, B>
139{
140 type Location = L;
141
142 fn create_source(ident: syn::Ident, location: L) -> Self {
143 let location_id = location.id();
144 Singleton::new(
145 location.clone(),
146 HydroNode::Persist {
147 inner: Box::new(HydroNode::CycleSource {
148 ident,
149 location_kind: location_id,
150 metadata: location.new_node_metadata::<T>(),
151 }),
152 metadata: location.new_node_metadata::<T>(),
153 },
154 )
155 }
156}
157
158impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker>
159 for Singleton<T, L, B>
160{
161 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
162 assert_eq!(
163 self.location.id(),
164 expected_location,
165 "locations do not match"
166 );
167 let metadata = self.location.new_node_metadata::<T>();
168 self.location
169 .flow_state()
170 .borrow_mut()
171 .leaves
172 .as_mut()
173 .expect(FLOW_USED_MESSAGE)
174 .push(HydroLeaf::CycleSink {
175 ident,
176 location_kind: self.location_kind(),
177 input: Box::new(HydroNode::Unpersist {
178 inner: Box::new(self.ir_node.into_inner()),
179 metadata: metadata.clone(),
180 }),
181 metadata,
182 });
183 }
184}
185
186impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton<T, L, B> {
187 fn clone(&self) -> Self {
188 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
189 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
190 *self.ir_node.borrow_mut() = HydroNode::Tee {
191 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
192 metadata: self.location.new_node_metadata::<T>(),
193 };
194 }
195
196 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
197 Singleton {
198 location: self.location.clone(),
199 ir_node: HydroNode::Tee {
200 inner: TeeNode(inner.0.clone()),
201 metadata: metadata.clone(),
202 }
203 .into(),
204 _phantom: PhantomData,
205 }
206 } else {
207 unreachable!()
208 }
209 }
210}
211
212impl<'a, T, L: Location<'a>, B> Singleton<T, L, B> {
213 pub fn map<U, F: Fn(T) -> U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B> {
214 let f = f.splice_fn1_ctx(&self.location).into();
215 Singleton::new(
216 self.location.clone(),
217 HydroNode::Map {
218 f,
219 input: Box::new(self.ir_node.into_inner()),
220 metadata: self.location.new_node_metadata::<U>(),
221 },
222 )
223 }
224
225 pub fn flat_map_ordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
226 self,
227 f: impl IntoQuotedMut<'a, F, L>,
228 ) -> Stream<U, L, B> {
229 let f = f.splice_fn1_ctx(&self.location).into();
230 Stream::new(
231 self.location.clone(),
232 HydroNode::FlatMap {
233 f,
234 input: Box::new(self.ir_node.into_inner()),
235 metadata: self.location.new_node_metadata::<U>(),
236 },
237 )
238 }
239
240 pub fn flat_map_unordered<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
241 self,
242 f: impl IntoQuotedMut<'a, F, L>,
243 ) -> Stream<U, L, B> {
244 let f = f.splice_fn1_ctx(&self.location).into();
245 Stream::new(
246 self.location.clone(),
247 HydroNode::FlatMap {
248 f,
249 input: Box::new(self.ir_node.into_inner()),
250 metadata: self.location.new_node_metadata::<U>(),
251 },
252 )
253 }
254
255 pub fn filter<F: Fn(&T) -> bool + 'a>(
256 self,
257 f: impl IntoQuotedMut<'a, F, L>,
258 ) -> Optional<T, L, B> {
259 let f = f.splice_fn1_borrow_ctx(&self.location).into();
260 Optional::new(
261 self.location.clone(),
262 HydroNode::Filter {
263 f,
264 input: Box::new(self.ir_node.into_inner()),
265 metadata: self.location.new_node_metadata::<T>(),
266 },
267 )
268 }
269
270 pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
271 self,
272 f: impl IntoQuotedMut<'a, F, L>,
273 ) -> Optional<U, L, B> {
274 let f = f.splice_fn1_ctx(&self.location).into();
275 Optional::new(
276 self.location.clone(),
277 HydroNode::FilterMap {
278 f,
279 input: Box::new(self.ir_node.into_inner()),
280 metadata: self.location.new_node_metadata::<U>(),
281 },
282 )
283 }
284
285 pub fn zip<Other>(self, other: Other) -> <Self as ZipResult<'a, Other>>::Out
286 where
287 Self: ZipResult<'a, Other, Location = L>,
288 {
289 check_matching_location(&self.location, &Self::other_location(&other));
290
291 if L::is_top_level() {
292 let left_ir_node = self.ir_node.into_inner();
293 let left_ir_node_metadata = left_ir_node.metadata().clone();
294 let right_ir_node = Self::other_ir_node(other);
295 let right_ir_node_metadata = right_ir_node.metadata().clone();
296
297 Self::make(
298 self.location.clone(),
299 HydroNode::Persist {
300 inner: Box::new(HydroNode::CrossSingleton {
301 left: Box::new(HydroNode::Unpersist {
302 inner: Box::new(left_ir_node),
303 metadata: left_ir_node_metadata,
304 }),
305 right: Box::new(HydroNode::Unpersist {
306 inner: Box::new(right_ir_node),
307 metadata: right_ir_node_metadata,
308 }),
309 metadata: self
310 .location
311 .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
312 }),
313 metadata: self
314 .location
315 .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
316 },
317 )
318 } else {
319 Self::make(
320 self.location.clone(),
321 HydroNode::CrossSingleton {
322 left: Box::new(self.ir_node.into_inner()),
323 right: Box::new(Self::other_ir_node(other)),
324 metadata: self
325 .location
326 .new_node_metadata::<<Self as ZipResult<'a, Other>>::ElementType>(),
327 },
328 )
329 }
330 }
331
332 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
333 where
334 Self: ZipResult<
335 'a,
336 Optional<(), L, Bounded>,
337 Location = L,
338 Out = Optional<(T, ()), L, Bounded>,
339 >,
340 {
341 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
342 }
343
344 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
345 where
346 Singleton<T, L, B>: ZipResult<
347 'a,
348 Optional<(), L, Bounded>,
349 Location = L,
350 Out = Optional<(T, ()), L, Bounded>,
351 >,
352 {
353 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
354 }
355}
356
357impl<'a, T, L: Location<'a> + NoTick, B> Singleton<T, Atomic<L>, B> {
358 pub unsafe fn latest_tick(self) -> Singleton<T, Tick<L>, Bounded> {
367 Singleton::new(
368 self.location.clone().tick,
369 HydroNode::Unpersist {
370 inner: Box::new(self.ir_node.into_inner()),
371 metadata: self.location.new_node_metadata::<T>(),
372 },
373 )
374 }
375
376 pub fn end_atomic(self) -> Optional<T, L, B> {
377 Optional::new(self.location.tick.l, self.ir_node.into_inner())
378 }
379}
380
381impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Singleton<T, L, B> {
382 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
383 Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
384 }
385
386 pub unsafe fn latest_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
395 where
396 L: NoTick,
397 {
398 unsafe { self.atomic(tick).latest_tick() }
399 }
400
401 pub unsafe fn sample_eager(self) -> Stream<T, L, Unbounded> {
409 let tick = self.location.tick();
410
411 unsafe {
412 self.latest_tick(&tick).all_ticks()
414 }
415 }
416
417 pub unsafe fn sample_every(
427 self,
428 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
429 ) -> Stream<T, L, Unbounded>
430 where
431 L: NoAtomic,
432 {
433 let samples = unsafe {
434 self.location.source_interval(interval)
436 };
437 let tick = self.location.tick();
438
439 unsafe {
440 self.latest_tick(&tick)
442 .continue_if(samples.tick_batch(&tick).first())
443 .all_ticks()
444 }
445 }
446}
447
448impl<'a, T, L: Location<'a>> Singleton<T, Tick<L>, Bounded> {
449 pub fn all_ticks(self) -> Stream<T, L, Unbounded> {
450 Stream::new(
451 self.location.outer().clone(),
452 HydroNode::Persist {
453 inner: Box::new(self.ir_node.into_inner()),
454 metadata: self.location.new_node_metadata::<T>(),
455 },
456 )
457 }
458
459 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded> {
460 Stream::new(
461 Atomic {
462 tick: self.location.clone(),
463 },
464 HydroNode::Persist {
465 inner: Box::new(self.ir_node.into_inner()),
466 metadata: self.location.new_node_metadata::<T>(),
467 },
468 )
469 }
470
471 pub fn latest(self) -> Singleton<T, L, Unbounded> {
472 Singleton::new(
473 self.location.outer().clone(),
474 HydroNode::Persist {
475 inner: Box::new(self.ir_node.into_inner()),
476 metadata: self.location.new_node_metadata::<T>(),
477 },
478 )
479 }
480
481 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
482 Singleton::new(
483 Atomic {
484 tick: self.location.clone(),
485 },
486 HydroNode::Persist {
487 inner: Box::new(self.ir_node.into_inner()),
488 metadata: self.location.new_node_metadata::<T>(),
489 },
490 )
491 }
492
493 pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
494 Singleton::new(
495 self.location.clone(),
496 HydroNode::DeferTick {
497 input: Box::new(self.ir_node.into_inner()),
498 metadata: self.location.new_node_metadata::<T>(),
499 },
500 )
501 }
502
503 pub fn persist(self) -> Stream<T, Tick<L>, Bounded> {
504 Stream::new(
505 self.location.clone(),
506 HydroNode::Persist {
507 inner: Box::new(self.ir_node.into_inner()),
508 metadata: self.location.new_node_metadata::<T>(),
509 },
510 )
511 }
512
513 pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
514 Optional::new(
515 self.location.clone(),
516 HydroNode::Delta {
517 inner: Box::new(self.ir_node.into_inner()),
518 metadata: self.location.new_node_metadata::<T>(),
519 },
520 )
521 }
522
523 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded> {
524 Stream::new(self.location, self.ir_node.into_inner())
525 }
526}
527
528pub trait ZipResult<'a, Other> {
529 type Out;
530 type ElementType;
531 type Location;
532
533 fn other_location(other: &Other) -> Self::Location;
534 fn other_ir_node(other: Other) -> HydroNode;
535
536 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
537}
538
539impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton<U, Tick<L>, B>>
540 for Singleton<T, Tick<L>, B>
541{
542 type Out = Singleton<(T, U), Tick<L>, B>;
543 type ElementType = (T, U);
544 type Location = Tick<L>;
545
546 fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
547 other.location.clone()
548 }
549
550 fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
551 other.ir_node.into_inner()
552 }
553
554 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
555 Singleton::new(location, ir_node)
556 }
557}
558
559impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional<U, Tick<L>, B>>
560 for Singleton<T, Tick<L>, B>
561{
562 type Out = Optional<(T, U), Tick<L>, B>;
563 type ElementType = (T, U);
564 type Location = Tick<L>;
565
566 fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
567 other.location.clone()
568 }
569
570 fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
571 other.ir_node.into_inner()
572 }
573
574 fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
575 Optional::new(location, ir_node)
576 }
577}