hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::NetworkHint;
12
13pub trait Deploy<'a> {
14    type InstantiateEnv;
15    type CompileEnv;
16
17    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
18    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
20        + RegisterPort<'a, Self>;
21    type Port: Clone;
22    type ExternalRawPort;
23    type Meta: Default;
24
25    /// Type of ID used to switch between different subgraphs at runtime.
26    type GraphId;
27
28    fn has_trivial_node() -> bool {
29        false
30    }
31
32    fn trivial_process(_id: usize) -> Self::Process {
33        panic!("No trivial process")
34    }
35
36    fn trivial_cluster(_id: usize) -> Self::Cluster {
37        panic!("No trivial cluster")
38    }
39
40    fn trivial_external(_id: usize) -> Self::External {
41        panic!("No trivial external process")
42    }
43
44    fn allocate_process_port(process: &Self::Process) -> Self::Port;
45    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
46    fn allocate_external_port(external: &Self::External) -> Self::Port;
47
48    fn o2o_sink_source(
49        compile_env: &Self::CompileEnv,
50        p1: &Self::Process,
51        p1_port: &Self::Port,
52        p2: &Self::Process,
53        p2_port: &Self::Port,
54    ) -> (syn::Expr, syn::Expr);
55    fn o2o_connect(
56        p1: &Self::Process,
57        p1_port: &Self::Port,
58        p2: &Self::Process,
59        p2_port: &Self::Port,
60    ) -> Box<dyn FnOnce()>;
61
62    fn o2m_sink_source(
63        compile_env: &Self::CompileEnv,
64        p1: &Self::Process,
65        p1_port: &Self::Port,
66        c2: &Self::Cluster,
67        c2_port: &Self::Port,
68    ) -> (syn::Expr, syn::Expr);
69    fn o2m_connect(
70        p1: &Self::Process,
71        p1_port: &Self::Port,
72        c2: &Self::Cluster,
73        c2_port: &Self::Port,
74    ) -> Box<dyn FnOnce()>;
75
76    fn m2o_sink_source(
77        compile_env: &Self::CompileEnv,
78        c1: &Self::Cluster,
79        c1_port: &Self::Port,
80        p2: &Self::Process,
81        p2_port: &Self::Port,
82    ) -> (syn::Expr, syn::Expr);
83    fn m2o_connect(
84        c1: &Self::Cluster,
85        c1_port: &Self::Port,
86        p2: &Self::Process,
87        p2_port: &Self::Port,
88    ) -> Box<dyn FnOnce()>;
89
90    fn m2m_sink_source(
91        compile_env: &Self::CompileEnv,
92        c1: &Self::Cluster,
93        c1_port: &Self::Port,
94        c2: &Self::Cluster,
95        c2_port: &Self::Port,
96    ) -> (syn::Expr, syn::Expr);
97    fn m2m_connect(
98        c1: &Self::Cluster,
99        c1_port: &Self::Port,
100        c2: &Self::Cluster,
101        c2_port: &Self::Port,
102    ) -> Box<dyn FnOnce()>;
103
104    fn e2o_many_source(
105        compile_env: &Self::CompileEnv,
106        extra_stmts: &mut Vec<syn::Stmt>,
107        p2: &Self::Process,
108        p2_port: &Self::Port,
109        codec_type: &syn::Type,
110        shared_handle: String,
111    ) -> syn::Expr;
112    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
113
114    #[expect(clippy::too_many_arguments, reason = "necessary for code generation")]
115    fn e2o_source(
116        compile_env: &Self::CompileEnv,
117        extra_stmts: &mut Vec<syn::Stmt>,
118        p1: &Self::External,
119        p1_port: &Self::Port,
120        p2: &Self::Process,
121        p2_port: &Self::Port,
122        codec_type: &syn::Type,
123        shared_handle: String,
124    ) -> syn::Expr;
125    fn e2o_connect(
126        p1: &Self::External,
127        p1_port: &Self::Port,
128        p2: &Self::Process,
129        p2_port: &Self::Port,
130        many: bool,
131        server_hint: NetworkHint,
132    ) -> Box<dyn FnOnce()>;
133
134    fn o2e_sink(
135        compile_env: &Self::CompileEnv,
136        p1: &Self::Process,
137        p1_port: &Self::Port,
138        p2: &Self::External,
139        p2_port: &Self::Port,
140        shared_handle: String,
141    ) -> syn::Expr;
142
143    fn cluster_ids(
144        env: &Self::CompileEnv,
145        of_cluster: usize,
146    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
147    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
148}
149
150pub trait ProcessSpec<'a, D>
151where
152    D: Deploy<'a> + ?Sized,
153{
154    fn build(self, id: usize, name_hint: &str) -> D::Process;
155}
156
157pub trait IntoProcessSpec<'a, D>
158where
159    D: Deploy<'a> + ?Sized,
160{
161    type ProcessSpec: ProcessSpec<'a, D>;
162    fn into_process_spec(self) -> Self::ProcessSpec;
163}
164
165impl<'a, D, T> IntoProcessSpec<'a, D> for T
166where
167    D: Deploy<'a> + ?Sized,
168    T: ProcessSpec<'a, D>,
169{
170    type ProcessSpec = T;
171    fn into_process_spec(self) -> Self::ProcessSpec {
172        self
173    }
174}
175
176pub trait ClusterSpec<'a, D>
177where
178    D: Deploy<'a> + ?Sized,
179{
180    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
181}
182
183pub trait ExternalSpec<'a, D>
184where
185    D: Deploy<'a> + ?Sized,
186{
187    fn build(self, id: usize, name_hint: &str) -> D::External;
188}
189
190pub trait Node {
191    type Port;
192    type Meta;
193    type InstantiateEnv;
194
195    fn next_port(&self) -> Self::Port;
196
197    fn update_meta(&mut self, meta: &Self::Meta);
198
199    fn instantiate(
200        &self,
201        env: &mut Self::InstantiateEnv,
202        meta: &mut Self::Meta,
203        graph: DfirGraph,
204        extra_stmts: Vec<syn::Stmt>,
205    );
206}
207
208pub type DynSourceSink<Out, In, InErr> = (
209    Pin<Box<dyn Stream<Item = Out>>>,
210    Pin<Box<dyn Sink<In, Error = InErr>>>,
211);
212
213pub trait RegisterPort<'a, D>: Clone
214where
215    D: Deploy<'a> + ?Sized,
216{
217    fn register(&self, key: usize, port: D::Port);
218    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
219
220    fn as_bytes_bidi(
221        &self,
222        key: usize,
223    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
224
225    fn as_bincode_bidi<InT, OutT>(
226        &self,
227        key: usize,
228    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
229    where
230        InT: Serialize + 'static,
231        OutT: DeserializeOwned + 'static;
232
233    fn as_bincode_sink<T>(
234        &self,
235        key: usize,
236    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
237    where
238        T: Serialize + 'static;
239
240    fn as_bincode_source<T>(
241        &self,
242        key: usize,
243    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
244    where
245        T: DeserializeOwned + 'static;
246}