1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use dfir_lang::graph::DfirGraph;
7use dfir_rs::bytes::Bytes;
8use dfir_rs::futures::{Sink, Stream};
9use dfir_rs::util::deploy::DeployPorts;
10use stageleft::{QuotedWithContext, RuntimeData};
11
12use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
13use crate::deploy_runtime::HydroMeta;
14
15pub struct DeployRuntime {}
16
17impl<'a> Deploy<'a> for DeployRuntime {
18 type InstantiateEnv = ();
19 type CompileEnv = RuntimeData<&'a DeployPorts<HydroMeta>>;
20 type Process = DeployRuntimeNode;
21 type Cluster = DeployRuntimeCluster;
22 type ExternalProcess = DeployRuntimeNode;
23 type Port = String;
24 type ExternalRawPort = ();
25 type Meta = ();
26 type GraphId = usize;
27
28 fn has_trivial_node() -> bool {
29 true
30 }
31
32 fn trivial_process(_id: usize) -> Self::Process {
33 DeployRuntimeNode {
34 next_port: Rc::new(RefCell::new(0)),
35 }
36 }
37
38 fn trivial_cluster(_id: usize) -> Self::Cluster {
39 DeployRuntimeCluster {
40 next_port: Rc::new(RefCell::new(0)),
41 }
42 }
43
44 fn allocate_process_port(process: &Self::Process) -> Self::Port {
45 process.next_port()
46 }
47
48 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
49 cluster.next_port()
50 }
51
52 fn allocate_external_port(_external: &Self::ExternalProcess) -> Self::Port {
53 panic!();
54 }
55
56 fn o2o_sink_source(
57 env: &Self::CompileEnv,
58 _p1: &Self::Process,
59 p1_port: &Self::Port,
60 _p2: &Self::Process,
61 p2_port: &Self::Port,
62 ) -> (syn::Expr, syn::Expr) {
63 crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str())
64 }
65
66 fn o2o_connect(
67 _p1: &Self::Process,
68 _p1_port: &Self::Port,
69 _p2: &Self::Process,
70 _p2_port: &Self::Port,
71 ) -> Box<dyn FnOnce()> {
72 Box::new(|| panic!())
73 }
74
75 fn o2m_sink_source(
76 env: &Self::CompileEnv,
77 _p1: &Self::Process,
78 p1_port: &Self::Port,
79 _c2: &Self::Cluster,
80 c2_port: &Self::Port,
81 ) -> (syn::Expr, syn::Expr) {
82 crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str())
83 }
84
85 fn o2m_connect(
86 _p1: &Self::Process,
87 _p1_port: &Self::Port,
88 _c2: &Self::Cluster,
89 _c2_port: &Self::Port,
90 ) -> Box<dyn FnOnce()> {
91 Box::new(|| panic!())
92 }
93
94 fn m2o_sink_source(
95 env: &Self::CompileEnv,
96 _c1: &Self::Cluster,
97 c1_port: &Self::Port,
98 _p2: &Self::Process,
99 p2_port: &Self::Port,
100 ) -> (syn::Expr, syn::Expr) {
101 crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str())
102 }
103
104 fn m2o_connect(
105 _c1: &Self::Cluster,
106 _c1_port: &Self::Port,
107 _p2: &Self::Process,
108 _p2_port: &Self::Port,
109 ) -> Box<dyn FnOnce()> {
110 Box::new(|| panic!())
111 }
112
113 fn m2m_sink_source(
114 env: &Self::CompileEnv,
115 _c1: &Self::Cluster,
116 c1_port: &Self::Port,
117 _c2: &Self::Cluster,
118 c2_port: &Self::Port,
119 ) -> (syn::Expr, syn::Expr) {
120 crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str())
121 }
122
123 fn m2m_connect(
124 _c1: &Self::Cluster,
125 _c1_port: &Self::Port,
126 _c2: &Self::Cluster,
127 _c2_port: &Self::Port,
128 ) -> Box<dyn FnOnce()> {
129 Box::new(|| panic!())
130 }
131
132 fn e2o_source(
133 _compile_env: &Self::CompileEnv,
134 _p1: &Self::ExternalProcess,
135 _p1_port: &Self::Port,
136 _p2: &Self::Process,
137 _p2_port: &Self::Port,
138 ) -> syn::Expr {
139 panic!()
140 }
141
142 fn e2o_connect(
143 _p1: &Self::ExternalProcess,
144 _p1_port: &Self::Port,
145 _p2: &Self::Process,
146 _p2_port: &Self::Port,
147 ) -> Box<dyn FnOnce()> {
148 panic!()
149 }
150
151 fn o2e_sink(
152 _compile_env: &Self::CompileEnv,
153 _p1: &Self::Process,
154 _p1_port: &Self::Port,
155 _p2: &Self::ExternalProcess,
156 _p2_port: &Self::Port,
157 ) -> syn::Expr {
158 panic!()
159 }
160
161 fn o2e_connect(
162 _p1: &Self::Process,
163 _p1_port: &Self::Port,
164 _p2: &Self::ExternalProcess,
165 _p2_port: &Self::Port,
166 ) -> Box<dyn FnOnce()> {
167 panic!()
168 }
169
170 fn cluster_ids(
171 env: &Self::CompileEnv,
172 of_cluster: usize,
173 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
174 crate::deploy_runtime::cluster_members(*env, of_cluster)
175 }
176
177 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
178 crate::deploy_runtime::cluster_self_id(*env)
179 }
180}
181
182#[derive(Clone)]
183pub struct DeployRuntimeNode {
184 next_port: Rc<RefCell<usize>>,
185}
186
187impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
188 fn register(&self, _key: usize, _port: <DeployRuntime as Deploy>::Port) {
189 panic!()
190 }
191
192 fn raw_port(&self, _key: usize) -> <DeployRuntime as Deploy>::ExternalRawPort {
193 panic!()
194 }
195
196 #[expect(
197 clippy::manual_async_fn,
198 reason = "buggy Clippy lint for lifetime bounds"
199 )]
200 fn as_bytes_sink(
201 &self,
202 _key: usize,
203 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>> + 'a {
204 async { panic!() }
205 }
206
207 #[expect(
208 clippy::manual_async_fn,
209 reason = "buggy Clippy lint for lifetime bounds"
210 )]
211 fn as_bincode_sink<T: serde::Serialize + 'static>(
212 &self,
213 _key: usize,
214 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a {
215 async { panic!() }
216 }
217
218 #[expect(
219 clippy::manual_async_fn,
220 reason = "buggy Clippy lint for lifetime bounds"
221 )]
222 fn as_bytes_source(
223 &self,
224 _key: usize,
225 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
226 async { panic!() }
227 }
228
229 #[expect(
230 clippy::manual_async_fn,
231 reason = "buggy Clippy lint for lifetime bounds"
232 )]
233 fn as_bincode_source<T: serde::de::DeserializeOwned + 'static>(
234 &self,
235 _key: usize,
236 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
237 async { panic!() }
238 }
239}
240
241impl Node for DeployRuntimeNode {
242 type Port = String;
243 type Meta = ();
244 type InstantiateEnv = ();
245
246 fn next_port(&self) -> String {
247 let next_send_port = *self.next_port.borrow();
248 *self.next_port.borrow_mut() += 1;
249 format!("port_{}", next_send_port)
250 }
251
252 fn update_meta(&mut self, _meta: &Self::Meta) {}
253
254 fn instantiate(
255 &self,
256 _env: &mut Self::InstantiateEnv,
257 _meta: &mut Self::Meta,
258 _graph: DfirGraph,
259 _extra_stmts: Vec<syn::Stmt>,
260 ) {
261 panic!(".deploy() cannot be called on a DeployRuntimeNode");
262 }
263}
264
265#[derive(Clone)]
266pub struct DeployRuntimeCluster {
267 next_port: Rc<RefCell<usize>>,
268}
269
270impl Node for DeployRuntimeCluster {
271 type Port = String;
272 type Meta = ();
273 type InstantiateEnv = ();
274
275 fn next_port(&self) -> String {
276 let next_send_port = *self.next_port.borrow();
277 *self.next_port.borrow_mut() += 1;
278 format!("port_{}", next_send_port)
279 }
280
281 fn update_meta(&mut self, _meta: &Self::Meta) {}
282
283 fn instantiate(
284 &self,
285 _env: &mut Self::InstantiateEnv,
286 _meta: &mut Self::Meta,
287 _graph: DfirGraph,
288 _extra_stmts: Vec<syn::Stmt>,
289 ) {
290 panic!(".deploy() cannot be called on a DeployRuntimeCluster");
291 }
292}
293
294impl ProcessSpec<'_, DeployRuntime> for () {
295 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
296 DeployRuntimeNode {
297 next_port: Rc::new(RefCell::new(0)),
298 }
299 }
300}
301
302impl ClusterSpec<'_, DeployRuntime> for () {
303 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeCluster {
304 DeployRuntimeCluster {
305 next_port: Rc::new(RefCell::new(0)),
306 }
307 }
308}
309
310impl ExternalSpec<'_, DeployRuntime> for () {
311 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
312 panic!()
313 }
314}