hydro_lang/deploy/
mod.rs

1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use bytes::Bytes;
6use dfir_lang::graph::DfirGraph;
7use futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
26pub use trybuild::init_test;
27
28#[cfg(feature = "deploy")]
29#[cfg(stageleft_runtime)]
30#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
31pub mod deploy_graph;
32
33#[cfg(feature = "deploy")]
34#[cfg(stageleft_runtime)]
35#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
36pub use deploy_graph::*;
37
38pub mod in_memory_graph;
39pub use in_memory_graph::*;
40
41pub trait LocalDeploy<'a> {
42    type Process: Node<Meta = Self::Meta>;
43    type Cluster: Node<Meta = Self::Meta>;
44    type ExternalProcess: Node<Meta = Self::Meta>;
45    type Meta: Default;
46    type GraphId;
47
48    fn has_trivial_node() -> bool {
49        false
50    }
51
52    fn trivial_process(_id: usize) -> Self::Process {
53        panic!("No trivial process")
54    }
55
56    fn trivial_cluster(_id: usize) -> Self::Cluster {
57        panic!("No trivial cluster")
58    }
59
60    fn trivial_external(_id: usize) -> Self::ExternalProcess {
61        panic!("No trivial external")
62    }
63}
64
65pub trait Deploy<'a> {
66    type InstantiateEnv;
67    type CompileEnv;
68
69    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
70    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
71    type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
72        + RegisterPort<'a, Self>;
73    type Port: Clone;
74    type ExternalRawPort;
75    type Meta: Default;
76
77    /// Type of ID used to switch between different subgraphs at runtime.
78    type GraphId;
79
80    fn has_trivial_node() -> bool {
81        false
82    }
83
84    fn trivial_process(_id: usize) -> Self::Process {
85        panic!("No trivial process")
86    }
87
88    fn trivial_cluster(_id: usize) -> Self::Cluster {
89        panic!("No trivial cluster")
90    }
91
92    fn allocate_process_port(process: &Self::Process) -> Self::Port;
93    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
94    fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
95
96    fn o2o_sink_source(
97        compile_env: &Self::CompileEnv,
98        p1: &Self::Process,
99        p1_port: &Self::Port,
100        p2: &Self::Process,
101        p2_port: &Self::Port,
102    ) -> (syn::Expr, syn::Expr);
103    fn o2o_connect(
104        p1: &Self::Process,
105        p1_port: &Self::Port,
106        p2: &Self::Process,
107        p2_port: &Self::Port,
108    ) -> Box<dyn FnOnce()>;
109
110    fn o2m_sink_source(
111        compile_env: &Self::CompileEnv,
112        p1: &Self::Process,
113        p1_port: &Self::Port,
114        c2: &Self::Cluster,
115        c2_port: &Self::Port,
116    ) -> (syn::Expr, syn::Expr);
117    fn o2m_connect(
118        p1: &Self::Process,
119        p1_port: &Self::Port,
120        c2: &Self::Cluster,
121        c2_port: &Self::Port,
122    ) -> Box<dyn FnOnce()>;
123
124    fn m2o_sink_source(
125        compile_env: &Self::CompileEnv,
126        c1: &Self::Cluster,
127        c1_port: &Self::Port,
128        p2: &Self::Process,
129        p2_port: &Self::Port,
130    ) -> (syn::Expr, syn::Expr);
131    fn m2o_connect(
132        c1: &Self::Cluster,
133        c1_port: &Self::Port,
134        p2: &Self::Process,
135        p2_port: &Self::Port,
136    ) -> Box<dyn FnOnce()>;
137
138    fn m2m_sink_source(
139        compile_env: &Self::CompileEnv,
140        c1: &Self::Cluster,
141        c1_port: &Self::Port,
142        c2: &Self::Cluster,
143        c2_port: &Self::Port,
144    ) -> (syn::Expr, syn::Expr);
145    fn m2m_connect(
146        c1: &Self::Cluster,
147        c1_port: &Self::Port,
148        c2: &Self::Cluster,
149        c2_port: &Self::Port,
150    ) -> Box<dyn FnOnce()>;
151
152    fn e2o_source(
153        compile_env: &Self::CompileEnv,
154        p1: &Self::ExternalProcess,
155        p1_port: &Self::Port,
156        p2: &Self::Process,
157        p2_port: &Self::Port,
158    ) -> syn::Expr;
159    fn e2o_connect(
160        p1: &Self::ExternalProcess,
161        p1_port: &Self::Port,
162        p2: &Self::Process,
163        p2_port: &Self::Port,
164    ) -> Box<dyn FnOnce()>;
165
166    fn o2e_sink(
167        compile_env: &Self::CompileEnv,
168        p1: &Self::Process,
169        p1_port: &Self::Port,
170        p2: &Self::ExternalProcess,
171        p2_port: &Self::Port,
172    ) -> syn::Expr;
173    fn o2e_connect(
174        p1: &Self::Process,
175        p1_port: &Self::Port,
176        p2: &Self::ExternalProcess,
177        p2_port: &Self::Port,
178    ) -> Box<dyn FnOnce()>;
179
180    fn cluster_ids(
181        env: &Self::CompileEnv,
182        of_cluster: usize,
183    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
184    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
185}
186
187impl<'a, T, N, C, E, M, R> LocalDeploy<'a> for T
188where
189    T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
190    N: Node<Meta = M>,
191    C: Node<Meta = M>,
192    E: Node<Meta = M>,
193    M: Default,
194{
195    type Process = N;
196    type Cluster = C;
197    type ExternalProcess = E;
198    type Meta = M;
199    type GraphId = R;
200
201    fn has_trivial_node() -> bool {
202        <T as Deploy<'a>>::has_trivial_node()
203    }
204
205    fn trivial_process(id: usize) -> Self::Process {
206        <T as Deploy<'a>>::trivial_process(id)
207    }
208
209    fn trivial_cluster(id: usize) -> Self::Cluster {
210        <T as Deploy<'a>>::trivial_cluster(id)
211    }
212}
213
214pub trait ProcessSpec<'a, D>
215where
216    D: LocalDeploy<'a> + ?Sized,
217{
218    fn build(self, id: usize, name_hint: &str) -> D::Process;
219}
220
221pub trait IntoProcessSpec<'a, D>
222where
223    D: LocalDeploy<'a> + ?Sized,
224{
225    type ProcessSpec: ProcessSpec<'a, D>;
226    fn into_process_spec(self) -> Self::ProcessSpec;
227}
228
229impl<'a, D, T> IntoProcessSpec<'a, D> for T
230where
231    D: LocalDeploy<'a> + ?Sized,
232    T: ProcessSpec<'a, D>,
233{
234    type ProcessSpec = T;
235    fn into_process_spec(self) -> Self::ProcessSpec {
236        self
237    }
238}
239
240pub trait ClusterSpec<'a, D>
241where
242    D: LocalDeploy<'a> + ?Sized,
243{
244    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
245}
246
247pub trait ExternalSpec<'a, D>
248where
249    D: LocalDeploy<'a> + ?Sized,
250{
251    fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
252}
253
254pub trait Node {
255    type Port;
256    type Meta;
257    type InstantiateEnv;
258
259    fn next_port(&self) -> Self::Port;
260
261    fn update_meta(&mut self, meta: &Self::Meta);
262
263    fn instantiate(
264        &self,
265        env: &mut Self::InstantiateEnv,
266        meta: &mut Self::Meta,
267        graph: DfirGraph,
268        extra_stmts: Vec<syn::Stmt>,
269    );
270}
271
272pub trait RegisterPort<'a, D>: Clone
273where
274    D: Deploy<'a> + ?Sized,
275{
276    fn register(&self, key: usize, port: D::Port);
277    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
278
279    fn as_bytes_sink(
280        &self,
281        key: usize,
282    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
283
284    fn as_bincode_sink<T>(
285        &self,
286        key: usize,
287    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
288    where
289        T: Serialize + 'static;
290
291    fn as_bytes_source(
292        &self,
293        key: usize,
294    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
295
296    fn as_bincode_source<T>(
297        &self,
298        key: usize,
299    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
300    where
301        T: DeserializeOwned + 'static;
302}