hydro_lang/location/
mod.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::stream::Stream as FuturesStream;
6use proc_macro2::Span;
7use stageleft::{QuotedWithContext, q};
8
9use super::builder::FlowState;
10use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
11use crate::ir::{HydroIrMetadata, HydroNode, HydroSource};
12use crate::stream::ExactlyOnce;
13use crate::{Singleton, Stream, TotalOrder, Unbounded};
14
15pub mod external_process;
16pub use external_process::ExternalProcess;
17
18pub mod process;
19pub use process::Process;
20
21pub mod cluster;
22pub use cluster::{Cluster, ClusterId};
23
24pub mod can_send;
25pub use can_send::CanSend;
26
27pub mod tick;
28pub use tick::{Atomic, NoTick, Tick};
29
30#[derive(PartialEq, Eq, Clone, Debug, Hash)]
31pub enum LocationId {
32    Process(usize),
33    Cluster(usize),
34    Tick(usize, Box<LocationId>),
35    ExternalProcess(usize),
36}
37
38impl LocationId {
39    pub fn root(&self) -> &LocationId {
40        match self {
41            LocationId::Process(_) => self,
42            LocationId::Cluster(_) => self,
43            LocationId::Tick(_, id) => id.root(),
44            LocationId::ExternalProcess(_) => self,
45        }
46    }
47
48    pub fn raw_id(&self) -> usize {
49        match self {
50            LocationId::Process(id) => *id,
51            LocationId::Cluster(id) => *id,
52            LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
53            LocationId::ExternalProcess(id) => *id,
54        }
55    }
56}
57
58pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
59    assert_eq!(l1.id(), l2.id(), "locations do not match");
60}
61
62pub trait Location<'a>: Clone {
63    type Root: Location<'a>;
64
65    fn root(&self) -> Self::Root;
66
67    fn id(&self) -> LocationId;
68
69    fn flow_state(&self) -> &FlowState;
70
71    fn is_top_level() -> bool;
72
73    fn tick(&self) -> Tick<Self>
74    where
75        Self: NoTick,
76    {
77        let next_id = self.flow_state().borrow_mut().next_clock_id;
78        self.flow_state().borrow_mut().next_clock_id += 1;
79        Tick {
80            id: next_id,
81            l: self.clone(),
82        }
83    }
84
85    fn next_node_id(&self) -> usize {
86        let next_id = self.flow_state().borrow_mut().next_node_id;
87        self.flow_state().borrow_mut().next_node_id += 1;
88        next_id
89    }
90
91    fn new_node_metadata<T>(&self) -> HydroIrMetadata {
92        HydroIrMetadata {
93            location_kind: self.id(),
94            output_type: Some(stageleft::quote_type::<T>().into()),
95            cardinality: None,
96            cpu_usage: None,
97        }
98    }
99
100    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
101    where
102        Self: Sized + NoTick,
103    {
104        Stream::new(
105            self.clone(),
106            HydroNode::Persist {
107                inner: Box::new(HydroNode::Source {
108                    source: HydroSource::Spin(),
109                    location_kind: self.id(),
110                    metadata: self.new_node_metadata::<()>(),
111                }),
112                metadata: self.new_node_metadata::<()>(),
113            },
114        )
115    }
116
117    fn source_stream<T, E>(
118        &self,
119        e: impl QuotedWithContext<'a, E, Self>,
120    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
121    where
122        E: FuturesStream<Item = T> + Unpin,
123        Self: Sized + NoTick,
124    {
125        let e = e.splice_untyped_ctx(self);
126
127        Stream::new(
128            self.clone(),
129            HydroNode::Persist {
130                inner: Box::new(HydroNode::Source {
131                    source: HydroSource::Stream(e.into()),
132                    location_kind: self.id(),
133                    metadata: self.new_node_metadata::<T>(),
134                }),
135                metadata: self.new_node_metadata::<T>(),
136            },
137        )
138    }
139
140    fn source_iter<T, E>(
141        &self,
142        e: impl QuotedWithContext<'a, E, Self>,
143    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
144    where
145        E: IntoIterator<Item = T>,
146        Self: Sized + NoTick,
147    {
148        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
149        // for bounded top-level streams, and this is the only way to generate one
150        let e = e.splice_untyped_ctx(self);
151
152        Stream::new(
153            self.clone(),
154            HydroNode::Persist {
155                inner: Box::new(HydroNode::Source {
156                    source: HydroSource::Iter(e.into()),
157                    location_kind: self.id(),
158                    metadata: self.new_node_metadata::<T>(),
159                }),
160                metadata: self.new_node_metadata::<T>(),
161            },
162        )
163    }
164
165    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
166    where
167        T: Clone,
168        Self: Sized + NoTick,
169    {
170        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
171        // for bounded top-level singletons, and this is the only way to generate one
172
173        let e_arr = q!([e]);
174        let e = e_arr.splice_untyped_ctx(self);
175
176        // we do a double persist here because if the singleton shows up on every tick,
177        // we first persist the source so that we store that value and then persist again
178        // so that it grows every tick
179        Singleton::new(
180            self.clone(),
181            HydroNode::Persist {
182                inner: Box::new(HydroNode::Persist {
183                    inner: Box::new(HydroNode::Source {
184                        source: HydroSource::Iter(e.into()),
185                        location_kind: self.id(),
186                        metadata: self.new_node_metadata::<T>(),
187                    }),
188                    metadata: self.new_node_metadata::<T>(),
189                }),
190                metadata: self.new_node_metadata::<T>(),
191            },
192        )
193    }
194
195    /// Generates a stream with values emitted at a fixed interval, with
196    /// each value being the current time (as an [`tokio::time::Instant`]).
197    ///
198    /// The clock source used is monotonic, so elements will be emitted in
199    /// increasing order.
200    ///
201    /// # Safety
202    /// Because this stream is generated by an OS timer, it will be
203    /// non-deterministic because each timestamp will be arbitrary.
204    unsafe fn source_interval(
205        &self,
206        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
207    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
208    where
209        Self: Sized + NoTick,
210    {
211        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
212            tokio::time::interval(interval)
213        )))
214    }
215
216    /// Generates a stream with values emitted at a fixed interval (with an
217    /// initial delay), with each value being the current time
218    /// (as an [`tokio::time::Instant`]).
219    ///
220    /// The clock source used is monotonic, so elements will be emitted in
221    /// increasing order.
222    ///
223    /// # Safety
224    /// Because this stream is generated by an OS timer, it will be
225    /// non-deterministic because each timestamp will be arbitrary.
226    unsafe fn source_interval_delayed(
227        &self,
228        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
229        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
230    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
231    where
232        Self: Sized + NoTick,
233    {
234        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
235            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
236        )))
237    }
238
239    fn forward_ref<S>(&self) -> (ForwardRef<'a, S>, S)
240    where
241        S: CycleCollection<'a, ForwardRefMarker, Location = Self>,
242        Self: NoTick,
243    {
244        let next_id = self.flow_state().borrow_mut().next_cycle_id();
245        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
246
247        (
248            ForwardRef {
249                completed: false,
250                ident: ident.clone(),
251                expected_location: self.id(),
252                _phantom: PhantomData,
253            },
254            S::create_source(ident, self.clone()),
255        )
256    }
257}