hydro_lang/deploy/
mod.rs

1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use dfir_lang::graph::DfirGraph;
6use dfir_rs::bytes::Bytes;
7use dfir_rs::futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12pub mod macro_runtime;
13pub use macro_runtime::*;
14
15#[cfg(feature = "deploy")]
16#[cfg(stageleft_runtime)]
17pub(crate) mod trybuild;
18
19#[cfg(feature = "deploy")]
20#[cfg(stageleft_runtime)]
21mod trybuild_rewriters;
22
23#[cfg(feature = "deploy")]
24#[cfg(stageleft_runtime)]
25#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
26pub use trybuild::init_test;
27
28#[cfg(feature = "deploy")]
29#[cfg(stageleft_runtime)]
30#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
31pub mod deploy_graph;
32
33#[cfg(feature = "deploy")]
34#[cfg(stageleft_runtime)]
35#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
36pub use deploy_graph::*;
37
38pub mod in_memory_graph;
39pub use in_memory_graph::*;
40
41pub trait LocalDeploy<'a> {
42    type Process: Node<Meta = Self::Meta>;
43    type Cluster: Node<Meta = Self::Meta>;
44    type ExternalProcess: Node<Meta = Self::Meta>;
45    type Meta: Default;
46    type GraphId;
47
48    fn has_trivial_node() -> bool {
49        false
50    }
51
52    fn trivial_process(_id: usize) -> Self::Process {
53        panic!("No trivial process")
54    }
55
56    fn trivial_cluster(_id: usize) -> Self::Cluster {
57        panic!("No trivial cluster")
58    }
59
60    fn trivial_external(_id: usize) -> Self::ExternalProcess {
61        panic!("No trivial external")
62    }
63}
64
65pub trait Deploy<'a> {
66    type InstantiateEnv;
67    type CompileEnv;
68
69    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
70    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
71    type ExternalProcess: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
72        + RegisterPort<'a, Self>;
73    type Port: Clone;
74    type ExternalRawPort;
75    type Meta: Default;
76
77    /// Type of ID used to switch between different subgraphs at runtime.
78    type GraphId;
79
80    fn has_trivial_node() -> bool {
81        false
82    }
83
84    fn trivial_process(_id: usize) -> Self::Process {
85        panic!("No trivial process")
86    }
87
88    fn trivial_cluster(_id: usize) -> Self::Cluster {
89        panic!("No trivial cluster")
90    }
91
92    fn allocate_process_port(process: &Self::Process) -> Self::Port;
93    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
94    fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port;
95
96    fn o2o_sink_source(
97        compile_env: &Self::CompileEnv,
98        p1: &Self::Process,
99        p1_port: &Self::Port,
100        p2: &Self::Process,
101        p2_port: &Self::Port,
102    ) -> (syn::Expr, syn::Expr);
103    fn o2o_connect(
104        p1: &Self::Process,
105        p1_port: &Self::Port,
106        p2: &Self::Process,
107        p2_port: &Self::Port,
108    ) -> Box<dyn FnOnce()>;
109
110    fn o2m_sink_source(
111        compile_env: &Self::CompileEnv,
112        p1: &Self::Process,
113        p1_port: &Self::Port,
114        c2: &Self::Cluster,
115        c2_port: &Self::Port,
116    ) -> (syn::Expr, syn::Expr);
117    fn o2m_connect(
118        p1: &Self::Process,
119        p1_port: &Self::Port,
120        c2: &Self::Cluster,
121        c2_port: &Self::Port,
122    ) -> Box<dyn FnOnce()>;
123
124    fn m2o_sink_source(
125        compile_env: &Self::CompileEnv,
126        c1: &Self::Cluster,
127        c1_port: &Self::Port,
128        p2: &Self::Process,
129        p2_port: &Self::Port,
130    ) -> (syn::Expr, syn::Expr);
131    fn m2o_connect(
132        c1: &Self::Cluster,
133        c1_port: &Self::Port,
134        p2: &Self::Process,
135        p2_port: &Self::Port,
136    ) -> Box<dyn FnOnce()>;
137
138    fn m2m_sink_source(
139        compile_env: &Self::CompileEnv,
140        c1: &Self::Cluster,
141        c1_port: &Self::Port,
142        c2: &Self::Cluster,
143        c2_port: &Self::Port,
144    ) -> (syn::Expr, syn::Expr);
145    fn m2m_connect(
146        c1: &Self::Cluster,
147        c1_port: &Self::Port,
148        c2: &Self::Cluster,
149        c2_port: &Self::Port,
150    ) -> Box<dyn FnOnce()>;
151
152    fn e2o_source(
153        compile_env: &Self::CompileEnv,
154        p1: &Self::ExternalProcess,
155        p1_port: &Self::Port,
156        p2: &Self::Process,
157        p2_port: &Self::Port,
158    ) -> syn::Expr;
159    fn e2o_connect(
160        p1: &Self::ExternalProcess,
161        p1_port: &Self::Port,
162        p2: &Self::Process,
163        p2_port: &Self::Port,
164    ) -> Box<dyn FnOnce()>;
165
166    fn o2e_sink(
167        compile_env: &Self::CompileEnv,
168        p1: &Self::Process,
169        p1_port: &Self::Port,
170        p2: &Self::ExternalProcess,
171        p2_port: &Self::Port,
172    ) -> syn::Expr;
173    fn o2e_connect(
174        p1: &Self::Process,
175        p1_port: &Self::Port,
176        p2: &Self::ExternalProcess,
177        p2_port: &Self::Port,
178    ) -> Box<dyn FnOnce()>;
179
180    fn cluster_ids(
181        env: &Self::CompileEnv,
182        of_cluster: usize,
183    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
184    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
185}
186
187impl<
188    'a,
189    T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>,
190    N: Node<Meta = M>,
191    C: Node<Meta = M>,
192    E: Node<Meta = M>,
193    M: Default,
194    R,
195> LocalDeploy<'a> for T
196{
197    type Process = N;
198    type Cluster = C;
199    type ExternalProcess = E;
200    type Meta = M;
201    type GraphId = R;
202
203    fn has_trivial_node() -> bool {
204        <T as Deploy<'a>>::has_trivial_node()
205    }
206
207    fn trivial_process(id: usize) -> Self::Process {
208        <T as Deploy<'a>>::trivial_process(id)
209    }
210
211    fn trivial_cluster(id: usize) -> Self::Cluster {
212        <T as Deploy<'a>>::trivial_cluster(id)
213    }
214}
215
216pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
217    fn build(self, id: usize, name_hint: &str) -> D::Process;
218}
219
220pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
221    type ProcessSpec: ProcessSpec<'a, D>;
222    fn into_process_spec(self) -> Self::ProcessSpec;
223}
224
225impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T {
226    type ProcessSpec = T;
227    fn into_process_spec(self) -> Self::ProcessSpec {
228        self
229    }
230}
231
232pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> {
233    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
234}
235
236pub trait ExternalSpec<'a, D: LocalDeploy<'a> + ?Sized> {
237    fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess;
238}
239
240pub trait Node {
241    type Port;
242    type Meta;
243    type InstantiateEnv;
244
245    fn next_port(&self) -> Self::Port;
246
247    fn update_meta(&mut self, meta: &Self::Meta);
248
249    fn instantiate(
250        &self,
251        env: &mut Self::InstantiateEnv,
252        meta: &mut Self::Meta,
253        graph: DfirGraph,
254        extra_stmts: Vec<syn::Stmt>,
255    );
256}
257
258pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone {
259    fn register(&self, key: usize, port: D::Port);
260    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
261
262    fn as_bytes_sink(
263        &self,
264        key: usize,
265    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a;
266
267    fn as_bincode_sink<T: Serialize + 'static>(
268        &self,
269        key: usize,
270    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a;
271
272    fn as_bytes_source(
273        &self,
274        key: usize,
275    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a;
276
277    fn as_bincode_source<T: DeserializeOwned + 'static>(
278        &self,
279        key: usize,
280    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a;
281}