hydro_lang/deploy/
mod.rs

1use std::future::Future;
2use std::io::Error;
3use std::pin::Pin;
4
5use bytes::{Bytes, BytesMut};
6use dfir_lang::graph::DfirGraph;
7use futures::{Sink, Stream};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use stageleft::QuotedWithContext;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "deploy")]
14pub(crate) mod trybuild;
15
16#[cfg(stageleft_runtime)]
17#[cfg(feature = "deploy")]
18mod trybuild_rewriters;
19
20#[cfg(stageleft_runtime)]
21#[cfg(feature = "deploy")]
22#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
23pub use trybuild::init_test;
24
25#[cfg(stageleft_runtime)]
26#[cfg(feature = "deploy")]
27#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
28pub mod deploy_graph;
29
30#[cfg(stageleft_runtime)]
31#[cfg(feature = "deploy")]
32#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
33pub use deploy_graph::*;
34
35use crate::NetworkHint;
36
37pub trait Deploy<'a> {
38    type InstantiateEnv;
39    type CompileEnv;
40
41    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
42    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
43    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
44        + RegisterPort<'a, Self>;
45    type Port: Clone;
46    type ExternalRawPort;
47    type Meta: Default;
48
49    /// Type of ID used to switch between different subgraphs at runtime.
50    type GraphId;
51
52    fn has_trivial_node() -> bool {
53        false
54    }
55
56    fn trivial_process(_id: usize) -> Self::Process {
57        panic!("No trivial process")
58    }
59
60    fn trivial_cluster(_id: usize) -> Self::Cluster {
61        panic!("No trivial cluster")
62    }
63
64    fn trivial_external(_id: usize) -> Self::External {
65        panic!("No trivial external process")
66    }
67
68    fn allocate_process_port(process: &Self::Process) -> Self::Port;
69    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
70    fn allocate_external_port(external: &Self::External) -> Self::Port;
71
72    fn o2o_sink_source(
73        compile_env: &Self::CompileEnv,
74        p1: &Self::Process,
75        p1_port: &Self::Port,
76        p2: &Self::Process,
77        p2_port: &Self::Port,
78    ) -> (syn::Expr, syn::Expr);
79    fn o2o_connect(
80        p1: &Self::Process,
81        p1_port: &Self::Port,
82        p2: &Self::Process,
83        p2_port: &Self::Port,
84    ) -> Box<dyn FnOnce()>;
85
86    fn o2m_sink_source(
87        compile_env: &Self::CompileEnv,
88        p1: &Self::Process,
89        p1_port: &Self::Port,
90        c2: &Self::Cluster,
91        c2_port: &Self::Port,
92    ) -> (syn::Expr, syn::Expr);
93    fn o2m_connect(
94        p1: &Self::Process,
95        p1_port: &Self::Port,
96        c2: &Self::Cluster,
97        c2_port: &Self::Port,
98    ) -> Box<dyn FnOnce()>;
99
100    fn m2o_sink_source(
101        compile_env: &Self::CompileEnv,
102        c1: &Self::Cluster,
103        c1_port: &Self::Port,
104        p2: &Self::Process,
105        p2_port: &Self::Port,
106    ) -> (syn::Expr, syn::Expr);
107    fn m2o_connect(
108        c1: &Self::Cluster,
109        c1_port: &Self::Port,
110        p2: &Self::Process,
111        p2_port: &Self::Port,
112    ) -> Box<dyn FnOnce()>;
113
114    fn m2m_sink_source(
115        compile_env: &Self::CompileEnv,
116        c1: &Self::Cluster,
117        c1_port: &Self::Port,
118        c2: &Self::Cluster,
119        c2_port: &Self::Port,
120    ) -> (syn::Expr, syn::Expr);
121    fn m2m_connect(
122        c1: &Self::Cluster,
123        c1_port: &Self::Port,
124        c2: &Self::Cluster,
125        c2_port: &Self::Port,
126    ) -> Box<dyn FnOnce()>;
127
128    fn e2o_many_source(
129        compile_env: &Self::CompileEnv,
130        extra_stmts: &mut Vec<syn::Stmt>,
131        p2: &Self::Process,
132        p2_port: &Self::Port,
133        codec_type: &syn::Type,
134        shared_handle: String,
135    ) -> syn::Expr;
136    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
137
138    fn e2o_source(
139        compile_env: &Self::CompileEnv,
140        p1: &Self::External,
141        p1_port: &Self::Port,
142        p2: &Self::Process,
143        p2_port: &Self::Port,
144    ) -> syn::Expr;
145    fn e2o_connect(
146        p1: &Self::External,
147        p1_port: &Self::Port,
148        p2: &Self::Process,
149        p2_port: &Self::Port,
150        many: bool,
151        server_hint: NetworkHint,
152    ) -> Box<dyn FnOnce()>;
153
154    fn o2e_sink(
155        compile_env: &Self::CompileEnv,
156        p1: &Self::Process,
157        p1_port: &Self::Port,
158        p2: &Self::External,
159        p2_port: &Self::Port,
160    ) -> syn::Expr;
161    fn o2e_connect(
162        p1: &Self::Process,
163        p1_port: &Self::Port,
164        p2: &Self::External,
165        p2_port: &Self::Port,
166    ) -> Box<dyn FnOnce()>;
167
168    fn cluster_ids(
169        env: &Self::CompileEnv,
170        of_cluster: usize,
171    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
172    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
173}
174
175pub trait ProcessSpec<'a, D>
176where
177    D: Deploy<'a> + ?Sized,
178{
179    fn build(self, id: usize, name_hint: &str) -> D::Process;
180}
181
182pub trait IntoProcessSpec<'a, D>
183where
184    D: Deploy<'a> + ?Sized,
185{
186    type ProcessSpec: ProcessSpec<'a, D>;
187    fn into_process_spec(self) -> Self::ProcessSpec;
188}
189
190impl<'a, D, T> IntoProcessSpec<'a, D> for T
191where
192    D: Deploy<'a> + ?Sized,
193    T: ProcessSpec<'a, D>,
194{
195    type ProcessSpec = T;
196    fn into_process_spec(self) -> Self::ProcessSpec {
197        self
198    }
199}
200
201pub trait ClusterSpec<'a, D>
202where
203    D: Deploy<'a> + ?Sized,
204{
205    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
206}
207
208pub trait ExternalSpec<'a, D>
209where
210    D: Deploy<'a> + ?Sized,
211{
212    fn build(self, id: usize, name_hint: &str) -> D::External;
213}
214
215pub trait Node {
216    type Port;
217    type Meta;
218    type InstantiateEnv;
219
220    fn next_port(&self) -> Self::Port;
221
222    fn update_meta(&mut self, meta: &Self::Meta);
223
224    fn instantiate(
225        &self,
226        env: &mut Self::InstantiateEnv,
227        meta: &mut Self::Meta,
228        graph: DfirGraph,
229        extra_stmts: Vec<syn::Stmt>,
230    );
231}
232
233type DynSourceSink<Out, In, InErr> = (
234    Pin<Box<dyn Stream<Item = Out>>>,
235    Pin<Box<dyn Sink<In, Error = InErr>>>,
236);
237
238pub trait RegisterPort<'a, D>: Clone
239where
240    D: Deploy<'a> + ?Sized,
241{
242    fn register(&self, key: usize, port: D::Port);
243    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
244
245    fn as_bytes_bidi(
246        &self,
247        key: usize,
248    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
249
250    fn as_bincode_bidi<InT, OutT>(
251        &self,
252        key: usize,
253    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
254    where
255        InT: Serialize + 'static,
256        OutT: DeserializeOwned + 'static;
257
258    fn as_bincode_sink<T>(
259        &self,
260        key: usize,
261    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
262    where
263        T: Serialize + 'static;
264
265    fn as_bincode_source<T>(
266        &self,
267        key: usize,
268    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
269    where
270        T: DeserializeOwned + 'static;
271}