hydro_lang/builder/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::NetworkHint;
12
13pub trait Deploy<'a> {
14    type InstantiateEnv;
15    type CompileEnv;
16
17    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
18    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
20        + RegisterPort<'a, Self>;
21    type Port: Clone;
22    type ExternalRawPort;
23    type Meta: Default;
24
25    /// Type of ID used to switch between different subgraphs at runtime.
26    type GraphId;
27
28    fn has_trivial_node() -> bool {
29        false
30    }
31
32    fn trivial_process(_id: usize) -> Self::Process {
33        panic!("No trivial process")
34    }
35
36    fn trivial_cluster(_id: usize) -> Self::Cluster {
37        panic!("No trivial cluster")
38    }
39
40    fn trivial_external(_id: usize) -> Self::External {
41        panic!("No trivial external process")
42    }
43
44    fn allocate_process_port(process: &Self::Process) -> Self::Port;
45    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
46    fn allocate_external_port(external: &Self::External) -> Self::Port;
47
48    fn o2o_sink_source(
49        compile_env: &Self::CompileEnv,
50        p1: &Self::Process,
51        p1_port: &Self::Port,
52        p2: &Self::Process,
53        p2_port: &Self::Port,
54    ) -> (syn::Expr, syn::Expr);
55    fn o2o_connect(
56        p1: &Self::Process,
57        p1_port: &Self::Port,
58        p2: &Self::Process,
59        p2_port: &Self::Port,
60    ) -> Box<dyn FnOnce()>;
61
62    fn o2m_sink_source(
63        compile_env: &Self::CompileEnv,
64        p1: &Self::Process,
65        p1_port: &Self::Port,
66        c2: &Self::Cluster,
67        c2_port: &Self::Port,
68    ) -> (syn::Expr, syn::Expr);
69    fn o2m_connect(
70        p1: &Self::Process,
71        p1_port: &Self::Port,
72        c2: &Self::Cluster,
73        c2_port: &Self::Port,
74    ) -> Box<dyn FnOnce()>;
75
76    fn m2o_sink_source(
77        compile_env: &Self::CompileEnv,
78        c1: &Self::Cluster,
79        c1_port: &Self::Port,
80        p2: &Self::Process,
81        p2_port: &Self::Port,
82    ) -> (syn::Expr, syn::Expr);
83    fn m2o_connect(
84        c1: &Self::Cluster,
85        c1_port: &Self::Port,
86        p2: &Self::Process,
87        p2_port: &Self::Port,
88    ) -> Box<dyn FnOnce()>;
89
90    fn m2m_sink_source(
91        compile_env: &Self::CompileEnv,
92        c1: &Self::Cluster,
93        c1_port: &Self::Port,
94        c2: &Self::Cluster,
95        c2_port: &Self::Port,
96    ) -> (syn::Expr, syn::Expr);
97    fn m2m_connect(
98        c1: &Self::Cluster,
99        c1_port: &Self::Port,
100        c2: &Self::Cluster,
101        c2_port: &Self::Port,
102    ) -> Box<dyn FnOnce()>;
103
104    fn e2o_many_source(
105        compile_env: &Self::CompileEnv,
106        extra_stmts: &mut Vec<syn::Stmt>,
107        p2: &Self::Process,
108        p2_port: &Self::Port,
109        codec_type: &syn::Type,
110        shared_handle: String,
111    ) -> syn::Expr;
112    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
113
114    fn e2o_source(
115        compile_env: &Self::CompileEnv,
116        p1: &Self::External,
117        p1_port: &Self::Port,
118        p2: &Self::Process,
119        p2_port: &Self::Port,
120    ) -> syn::Expr;
121    fn e2o_connect(
122        p1: &Self::External,
123        p1_port: &Self::Port,
124        p2: &Self::Process,
125        p2_port: &Self::Port,
126        many: bool,
127        server_hint: NetworkHint,
128    ) -> Box<dyn FnOnce()>;
129
130    fn o2e_sink(
131        compile_env: &Self::CompileEnv,
132        p1: &Self::Process,
133        p1_port: &Self::Port,
134        p2: &Self::External,
135        p2_port: &Self::Port,
136    ) -> syn::Expr;
137    fn o2e_connect(
138        p1: &Self::Process,
139        p1_port: &Self::Port,
140        p2: &Self::External,
141        p2_port: &Self::Port,
142    ) -> Box<dyn FnOnce()>;
143
144    fn cluster_ids(
145        env: &Self::CompileEnv,
146        of_cluster: usize,
147    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
148    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
149}
150
151pub trait ProcessSpec<'a, D>
152where
153    D: Deploy<'a> + ?Sized,
154{
155    fn build(self, id: usize, name_hint: &str) -> D::Process;
156}
157
158pub trait IntoProcessSpec<'a, D>
159where
160    D: Deploy<'a> + ?Sized,
161{
162    type ProcessSpec: ProcessSpec<'a, D>;
163    fn into_process_spec(self) -> Self::ProcessSpec;
164}
165
166impl<'a, D, T> IntoProcessSpec<'a, D> for T
167where
168    D: Deploy<'a> + ?Sized,
169    T: ProcessSpec<'a, D>,
170{
171    type ProcessSpec = T;
172    fn into_process_spec(self) -> Self::ProcessSpec {
173        self
174    }
175}
176
177pub trait ClusterSpec<'a, D>
178where
179    D: Deploy<'a> + ?Sized,
180{
181    fn build(self, id: usize, name_hint: &str) -> D::Cluster;
182}
183
184pub trait ExternalSpec<'a, D>
185where
186    D: Deploy<'a> + ?Sized,
187{
188    fn build(self, id: usize, name_hint: &str) -> D::External;
189}
190
191pub trait Node {
192    type Port;
193    type Meta;
194    type InstantiateEnv;
195
196    fn next_port(&self) -> Self::Port;
197
198    fn update_meta(&mut self, meta: &Self::Meta);
199
200    fn instantiate(
201        &self,
202        env: &mut Self::InstantiateEnv,
203        meta: &mut Self::Meta,
204        graph: DfirGraph,
205        extra_stmts: Vec<syn::Stmt>,
206    );
207}
208
209type DynSourceSink<Out, In, InErr> = (
210    Pin<Box<dyn Stream<Item = Out>>>,
211    Pin<Box<dyn Sink<In, Error = InErr>>>,
212);
213
214pub trait RegisterPort<'a, D>: Clone
215where
216    D: Deploy<'a> + ?Sized,
217{
218    fn register(&self, key: usize, port: D::Port);
219    fn raw_port(&self, key: usize) -> D::ExternalRawPort;
220
221    fn as_bytes_bidi(
222        &self,
223        key: usize,
224    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
225
226    fn as_bincode_bidi<InT, OutT>(
227        &self,
228        key: usize,
229    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
230    where
231        InT: Serialize + 'static,
232        OutT: DeserializeOwned + 'static;
233
234    fn as_bincode_sink<T>(
235        &self,
236        key: usize,
237    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
238    where
239        T: Serialize + 'static;
240
241    fn as_bincode_source<T>(
242        &self,
243        key: usize,
244    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
245    where
246        T: DeserializeOwned + 'static;
247}