1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use dfir_rs::futures::stream::Stream as FuturesStream;
6use dfir_rs::{tokio, tokio_stream};
7use proc_macro2::Span;
8use stageleft::{QuotedWithContext, q};
9
10use super::builder::FlowState;
11use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
12use crate::ir::{DebugType, HydroIrMetadata, HydroNode, HydroSource};
13use crate::{Singleton, Stream, Unbounded};
14
15pub mod external_process;
16pub use external_process::ExternalProcess;
17
18pub mod process;
19pub use process::Process;
20
21pub mod cluster;
22pub use cluster::{Cluster, ClusterId};
23
24pub mod can_send;
25pub use can_send::CanSend;
26
27pub mod tick;
28pub use tick::{Atomic, NoTick, Tick};
29
30#[derive(PartialEq, Eq, Clone, Debug, Hash)]
31pub enum LocationId {
32 Process(usize),
33 Cluster(usize),
34 Tick(usize, Box<LocationId>),
35 ExternalProcess(usize),
36}
37
38impl LocationId {
39 pub fn root(&self) -> &LocationId {
40 match self {
41 LocationId::Process(_) => self,
42 LocationId::Cluster(_) => self,
43 LocationId::Tick(_, id) => id.root(),
44 LocationId::ExternalProcess(_) => self,
45 }
46 }
47
48 pub fn raw_id(&self) -> usize {
49 match self {
50 LocationId::Process(id) => *id,
51 LocationId::Cluster(id) => *id,
52 LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
53 LocationId::ExternalProcess(id) => *id,
54 }
55 }
56}
57
58pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
59 assert_eq!(l1.id(), l2.id(), "locations do not match");
60}
61
62pub trait Location<'a>: Clone {
63 type Root: Location<'a>;
64
65 fn root(&self) -> Self::Root;
66
67 fn id(&self) -> LocationId;
68
69 fn flow_state(&self) -> &FlowState;
70
71 fn is_top_level() -> bool;
72
73 fn tick(&self) -> Tick<Self>
74 where
75 Self: NoTick,
76 {
77 let next_id = self.flow_state().borrow_mut().next_clock_id;
78 self.flow_state().borrow_mut().next_clock_id += 1;
79 Tick {
80 id: next_id,
81 l: self.clone(),
82 }
83 }
84
85 fn next_node_id(&self) -> usize {
86 let next_id = self.flow_state().borrow_mut().next_node_id;
87 self.flow_state().borrow_mut().next_node_id += 1;
88 next_id
89 }
90
91 fn new_node_metadata<T>(&self) -> HydroIrMetadata {
92 HydroIrMetadata {
93 location_kind: self.id(),
94 output_type: Some(DebugType(stageleft::quote_type::<T>())),
95 cardinality: None,
96 cpu_usage: None,
97 }
98 }
99
100 fn spin(&self) -> Stream<(), Self, Unbounded>
101 where
102 Self: Sized + NoTick,
103 {
104 Stream::new(
105 self.clone(),
106 HydroNode::Persist {
107 inner: Box::new(HydroNode::Source {
108 source: HydroSource::Spin(),
109 location_kind: self.id(),
110 metadata: self.new_node_metadata::<()>(),
111 }),
112 metadata: self.new_node_metadata::<()>(),
113 },
114 )
115 }
116
117 fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
118 &self,
119 e: impl QuotedWithContext<'a, E, Self>,
120 ) -> Stream<T, Self, Unbounded>
121 where
122 Self: Sized + NoTick,
123 {
124 let e = e.splice_untyped_ctx(self);
125
126 Stream::new(
127 self.clone(),
128 HydroNode::Persist {
129 inner: Box::new(HydroNode::Source {
130 source: HydroSource::Stream(e.into()),
131 location_kind: self.id(),
132 metadata: self.new_node_metadata::<T>(),
133 }),
134 metadata: self.new_node_metadata::<T>(),
135 },
136 )
137 }
138
139 fn source_iter<T, E: IntoIterator<Item = T>>(
140 &self,
141 e: impl QuotedWithContext<'a, E, Self>,
142 ) -> Stream<T, Self, Unbounded>
143 where
144 Self: Sized + NoTick,
145 {
146 let e = e.splice_untyped_ctx(self);
149
150 Stream::new(
151 self.clone(),
152 HydroNode::Persist {
153 inner: Box::new(HydroNode::Source {
154 source: HydroSource::Iter(e.into()),
155 location_kind: self.id(),
156 metadata: self.new_node_metadata::<T>(),
157 }),
158 metadata: self.new_node_metadata::<T>(),
159 },
160 )
161 }
162
163 fn singleton<T: Clone>(
164 &self,
165 e: impl QuotedWithContext<'a, T, Self>,
166 ) -> Singleton<T, Self, Unbounded>
167 where
168 Self: Sized + NoTick,
169 {
170 let e_arr = q!([e]);
174 let e = e_arr.splice_untyped_ctx(self);
175
176 Singleton::new(
180 self.clone(),
181 HydroNode::Persist {
182 inner: Box::new(HydroNode::Persist {
183 inner: Box::new(HydroNode::Source {
184 source: HydroSource::Iter(e.into()),
185 location_kind: self.id(),
186 metadata: self.new_node_metadata::<T>(),
187 }),
188 metadata: self.new_node_metadata::<T>(),
189 }),
190 metadata: self.new_node_metadata::<T>(),
191 },
192 )
193 }
194
195 unsafe fn source_interval(
205 &self,
206 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
207 ) -> Stream<tokio::time::Instant, Self, Unbounded>
208 where
209 Self: Sized + NoTick,
210 {
211 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
212 tokio::time::interval(interval)
213 )))
214 }
215
216 unsafe fn source_interval_delayed(
227 &self,
228 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
230 ) -> Stream<tokio::time::Instant, Self, Unbounded>
231 where
232 Self: Sized + NoTick,
233 {
234 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
235 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
236 )))
237 }
238
239 fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
240 &self,
241 ) -> (ForwardRef<'a, S>, S)
242 where
243 Self: NoTick,
244 {
245 let next_id = {
246 let on_id = match self.id() {
247 LocationId::Process(id) => id,
248 LocationId::Cluster(id) => id,
249 LocationId::Tick(_, _) => panic!(),
250 LocationId::ExternalProcess(_) => panic!(),
251 };
252
253 let mut flow_state = self.flow_state().borrow_mut();
254 let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
255
256 let id = *next_id_entry;
257 *next_id_entry += 1;
258 id
259 };
260
261 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
262
263 (
264 ForwardRef {
265 completed: false,
266 ident: ident.clone(),
267 expected_location: self.id(),
268 _phantom: PhantomData,
269 },
270 S::create_source(ident, self.clone()),
271 )
272 }
273}