1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::location::NetworkHint;
12
13pub trait Deploy<'a> {
14 type InstantiateEnv;
15 type CompileEnv;
16
17 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
18 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
19 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
20 + RegisterPort<'a, Self>;
21 type Port: Clone;
22 type ExternalRawPort;
23 type Meta: Default;
24
25 type GraphId;
27
28 fn has_trivial_node() -> bool {
29 false
30 }
31
32 fn trivial_process(_id: usize) -> Self::Process {
33 panic!("No trivial process")
34 }
35
36 fn trivial_cluster(_id: usize) -> Self::Cluster {
37 panic!("No trivial cluster")
38 }
39
40 fn trivial_external(_id: usize) -> Self::External {
41 panic!("No trivial external process")
42 }
43
44 fn allocate_process_port(process: &Self::Process) -> Self::Port;
45 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port;
46 fn allocate_external_port(external: &Self::External) -> Self::Port;
47
48 fn o2o_sink_source(
49 compile_env: &Self::CompileEnv,
50 p1: &Self::Process,
51 p1_port: &Self::Port,
52 p2: &Self::Process,
53 p2_port: &Self::Port,
54 ) -> (syn::Expr, syn::Expr);
55 fn o2o_connect(
56 p1: &Self::Process,
57 p1_port: &Self::Port,
58 p2: &Self::Process,
59 p2_port: &Self::Port,
60 ) -> Box<dyn FnOnce()>;
61
62 fn o2m_sink_source(
63 compile_env: &Self::CompileEnv,
64 p1: &Self::Process,
65 p1_port: &Self::Port,
66 c2: &Self::Cluster,
67 c2_port: &Self::Port,
68 ) -> (syn::Expr, syn::Expr);
69 fn o2m_connect(
70 p1: &Self::Process,
71 p1_port: &Self::Port,
72 c2: &Self::Cluster,
73 c2_port: &Self::Port,
74 ) -> Box<dyn FnOnce()>;
75
76 fn m2o_sink_source(
77 compile_env: &Self::CompileEnv,
78 c1: &Self::Cluster,
79 c1_port: &Self::Port,
80 p2: &Self::Process,
81 p2_port: &Self::Port,
82 ) -> (syn::Expr, syn::Expr);
83 fn m2o_connect(
84 c1: &Self::Cluster,
85 c1_port: &Self::Port,
86 p2: &Self::Process,
87 p2_port: &Self::Port,
88 ) -> Box<dyn FnOnce()>;
89
90 fn m2m_sink_source(
91 compile_env: &Self::CompileEnv,
92 c1: &Self::Cluster,
93 c1_port: &Self::Port,
94 c2: &Self::Cluster,
95 c2_port: &Self::Port,
96 ) -> (syn::Expr, syn::Expr);
97 fn m2m_connect(
98 c1: &Self::Cluster,
99 c1_port: &Self::Port,
100 c2: &Self::Cluster,
101 c2_port: &Self::Port,
102 ) -> Box<dyn FnOnce()>;
103
104 fn e2o_many_source(
105 compile_env: &Self::CompileEnv,
106 extra_stmts: &mut Vec<syn::Stmt>,
107 p2: &Self::Process,
108 p2_port: &Self::Port,
109 codec_type: &syn::Type,
110 shared_handle: String,
111 ) -> syn::Expr;
112 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
113
114 fn e2o_source(
115 compile_env: &Self::CompileEnv,
116 p1: &Self::External,
117 p1_port: &Self::Port,
118 p2: &Self::Process,
119 p2_port: &Self::Port,
120 ) -> syn::Expr;
121 fn e2o_connect(
122 p1: &Self::External,
123 p1_port: &Self::Port,
124 p2: &Self::Process,
125 p2_port: &Self::Port,
126 many: bool,
127 server_hint: NetworkHint,
128 ) -> Box<dyn FnOnce()>;
129
130 fn o2e_sink(
131 compile_env: &Self::CompileEnv,
132 p1: &Self::Process,
133 p1_port: &Self::Port,
134 p2: &Self::External,
135 p2_port: &Self::Port,
136 ) -> syn::Expr;
137 fn o2e_connect(
138 p1: &Self::Process,
139 p1_port: &Self::Port,
140 p2: &Self::External,
141 p2_port: &Self::Port,
142 ) -> Box<dyn FnOnce()>;
143
144 fn cluster_ids(
145 env: &Self::CompileEnv,
146 of_cluster: usize,
147 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a;
148 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a;
149}
150
151pub trait ProcessSpec<'a, D>
152where
153 D: Deploy<'a> + ?Sized,
154{
155 fn build(self, id: usize, name_hint: &str) -> D::Process;
156}
157
158pub trait IntoProcessSpec<'a, D>
159where
160 D: Deploy<'a> + ?Sized,
161{
162 type ProcessSpec: ProcessSpec<'a, D>;
163 fn into_process_spec(self) -> Self::ProcessSpec;
164}
165
166impl<'a, D, T> IntoProcessSpec<'a, D> for T
167where
168 D: Deploy<'a> + ?Sized,
169 T: ProcessSpec<'a, D>,
170{
171 type ProcessSpec = T;
172 fn into_process_spec(self) -> Self::ProcessSpec {
173 self
174 }
175}
176
177pub trait ClusterSpec<'a, D>
178where
179 D: Deploy<'a> + ?Sized,
180{
181 fn build(self, id: usize, name_hint: &str) -> D::Cluster;
182}
183
184pub trait ExternalSpec<'a, D>
185where
186 D: Deploy<'a> + ?Sized,
187{
188 fn build(self, id: usize, name_hint: &str) -> D::External;
189}
190
191pub trait Node {
192 type Port;
193 type Meta;
194 type InstantiateEnv;
195
196 fn next_port(&self) -> Self::Port;
197
198 fn update_meta(&mut self, meta: &Self::Meta);
199
200 fn instantiate(
201 &self,
202 env: &mut Self::InstantiateEnv,
203 meta: &mut Self::Meta,
204 graph: DfirGraph,
205 extra_stmts: Vec<syn::Stmt>,
206 );
207}
208
209type DynSourceSink<Out, In, InErr> = (
210 Pin<Box<dyn Stream<Item = Out>>>,
211 Pin<Box<dyn Sink<In, Error = InErr>>>,
212);
213
214pub trait RegisterPort<'a, D>: Clone
215where
216 D: Deploy<'a> + ?Sized,
217{
218 fn register(&self, key: usize, port: D::Port);
219 fn raw_port(&self, key: usize) -> D::ExternalRawPort;
220
221 fn as_bytes_bidi(
222 &self,
223 key: usize,
224 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
225
226 fn as_bincode_bidi<InT, OutT>(
227 &self,
228 key: usize,
229 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
230 where
231 InT: Serialize + 'static,
232 OutT: DeserializeOwned + 'static;
233
234 fn as_bincode_sink<T>(
235 &self,
236 key: usize,
237 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
238 where
239 T: Serialize + 'static;
240
241 fn as_bincode_source<T>(
242 &self,
243 key: usize,
244 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
245 where
246 T: DeserializeOwned + 'static;
247}