
1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
5use dfir_rs::futures::stream::Stream as FuturesStream;
6use dfir_rs::{tokio, tokio_stream};
7use proc_macro2::Span;
8use stageleft::{QuotedWithContext, q};
10use super::builder::FlowState;
11use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
12use crate::ir::{DebugType, HydroIrMetadata, HydroNode, HydroSource};
13use crate::{Singleton, Stream, Unbounded};
15pub mod external_process;
16pub use external_process::ExternalProcess;
18pub mod process;
19pub use process::Process;
21pub mod cluster;
22pub use cluster::{Cluster, ClusterId};
24pub mod can_send;
25pub use can_send::CanSend;
27pub mod tick;
28pub use tick::{Atomic, NoTick, Tick};
30#[derive(PartialEq, Eq, Clone, Debug, Hash)]
31pub enum LocationId {
32    Process(usize),
33    Cluster(usize),
34    Tick(usize, Box<LocationId>),
35    ExternalProcess(usize),
38impl LocationId {
39    pub fn root(&self) -> &LocationId {
40        match self {
41            LocationId::Process(_) => self,
42            LocationId::Cluster(_) => self,
43            LocationId::Tick(_, id) => id.root(),
44            LocationId::ExternalProcess(_) => self,
45        }
46    }
48    pub fn raw_id(&self) -> usize {
49        match self {
50            LocationId::Process(id) => *id,
51            LocationId::Cluster(id) => *id,
52            LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
53            LocationId::ExternalProcess(id) => *id,
54        }
55    }
58pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
59    assert_eq!(l1.id(), l2.id(), "locations do not match");
62pub trait Location<'a>: Clone {
63    type Root: Location<'a>;
65    fn root(&self) -> Self::Root;
67    fn id(&self) -> LocationId;
69    fn flow_state(&self) -> &FlowState;
71    fn is_top_level() -> bool;
73    fn tick(&self) -> Tick<Self>
74    where
75        Self: NoTick,
76    {
77        let next_id = self.flow_state().borrow_mut().next_clock_id;
78        self.flow_state().borrow_mut().next_clock_id += 1;
79        Tick {
80            id: next_id,
81            l: self.clone(),
82        }
83    }
85    fn next_node_id(&self) -> usize {
86        let next_id = self.flow_state().borrow_mut().next_node_id;
87        self.flow_state().borrow_mut().next_node_id += 1;
88        next_id
89    }
91    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
92        HydroIrMetadata {
93            location_kind: self.id(),
94            output_type: Some(DebugType(stageleft::quote_type::<T>())),
95            cardinality: None,
96            cpu_usage: None,
97        }
98    }
100    fn spin(&self) -> Stream<(), Self, Unbounded>
101    where
102        Self: Sized + NoTick,
103    {
104        Stream::new(
105            self.clone(),
106            HydroNode::Persist {
107                inner: Box::new(HydroNode::Source {
108                    source: HydroSource::Spin(),
109                    location_kind: self.id(),
110                    metadata: self.new_node_metadata::<()>(),
111                }),
112                metadata: self.new_node_metadata::<()>(),
113            },
114        )
115    }
117    fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
118        &self,
119        e: impl QuotedWithContext<'a, E, Self>,
120    ) -> Stream<T, Self, Unbounded>
121    where
122        Self: Sized + NoTick,
123    {
124        let e = e.splice_untyped_ctx(self);
126        Stream::new(
127            self.clone(),
128            HydroNode::Persist {
129                inner: Box::new(HydroNode::Source {
130                    source: HydroSource::Stream(e.into()),
131                    location_kind: self.id(),
132                    metadata: self.new_node_metadata::<T>(),
133                }),
134                metadata: self.new_node_metadata::<T>(),
135            },
136        )
137    }
139    fn source_iter<T, E: IntoIterator<Item = T>>(
140        &self,
141        e: impl QuotedWithContext<'a, E, Self>,
142    ) -> Stream<T, Self, Unbounded>
143    where
144        Self: Sized + NoTick,
145    {
146        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
147        // for bounded top-level streams, and this is the only way to generate one
148        let e = e.splice_untyped_ctx(self);
150        Stream::new(
151            self.clone(),
152            HydroNode::Persist {
153                inner: Box::new(HydroNode::Source {
154                    source: HydroSource::Iter(e.into()),
155                    location_kind: self.id(),
156                    metadata: self.new_node_metadata::<T>(),
157                }),
158                metadata: self.new_node_metadata::<T>(),
159            },
160        )
161    }
163    fn singleton<T: Clone>(
164        &self,
165        e: impl QuotedWithContext<'a, T, Self>,
166    ) -> Singleton<T, Self, Unbounded>
167    where
168        Self: Sized + NoTick,
169    {
170        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
171        // for bounded top-level singletons, and this is the only way to generate one
173        let e_arr = q!([e]);
174        let e = e_arr.splice_untyped_ctx(self);
176        // we do a double persist here because if the singleton shows up on every tick,
177        // we first persist the source so that we store that value and then persist again
178        // so that it grows every tick
179        Singleton::new(
180            self.clone(),
181            HydroNode::Persist {
182                inner: Box::new(HydroNode::Persist {
183                    inner: Box::new(HydroNode::Source {
184                        source: HydroSource::Iter(e.into()),
185                        location_kind: self.id(),
186                        metadata: self.new_node_metadata::<T>(),
187                    }),
188                    metadata: self.new_node_metadata::<T>(),
189                }),
190                metadata: self.new_node_metadata::<T>(),
191            },
192        )
193    }
195    /// Generates a stream with values emitted at a fixed interval, with
196    /// each value being the current time (as an [`tokio::time::Instant`]).
197    ///
198    /// The clock source used is monotonic, so elements will be emitted in
199    /// increasing order.
200    ///
201    /// # Safety
202    /// Because this stream is generated by an OS timer, it will be
203    /// non-deterministic because each timestamp will be arbitrary.
204    unsafe fn source_interval(
205        &self,
206        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
207    ) -> Stream<tokio::time::Instant, Self, Unbounded>
208    where
209        Self: Sized + NoTick,
210    {
211        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
212            tokio::time::interval(interval)
213        )))
214    }
216    /// Generates a stream with values emitted at a fixed interval (with an
217    /// initial delay), with each value being the current time
218    /// (as an [`tokio::time::Instant`]).
219    ///
220    /// The clock source used is monotonic, so elements will be emitted in
221    /// increasing order.
222    ///
223    /// # Safety
224    /// Because this stream is generated by an OS timer, it will be
225    /// non-deterministic because each timestamp will be arbitrary.
226    unsafe fn source_interval_delayed(
227        &self,
228        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
230    ) -> Stream<tokio::time::Instant, Self, Unbounded>
231    where
232        Self: Sized + NoTick,
233    {
234        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
235            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
236        )))
237    }
239    fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
240        &self,
241    ) -> (ForwardRef<'a, S>, S)
242    where
243        Self: NoTick,
244    {
245        let next_id = {
246            let on_id = match self.id() {
247                LocationId::Process(id) => id,
248                LocationId::Cluster(id) => id,
249                LocationId::Tick(_, _) => panic!(),
250                LocationId::ExternalProcess(_) => panic!(),
251            };
253            let mut flow_state = self.flow_state().borrow_mut();
254            let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
256            let id = *next_id_entry;
257            *next_id_entry += 1;
258            id
259        };
261        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
263        (
264            ForwardRef {
265                completed: false,
266                ident: ident.clone(),
267                expected_location: self.id(),
268                _phantom: PhantomData,
269            },
270            S::create_source(ident, self.clone()),
271        )
272    }