1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use bytes::Bytes;
7use dfir_lang::graph::DfirGraph;
8use futures::{Sink, Stream};
9use hydro_deploy_integration::DeployPorts;
10use stageleft::{QuotedWithContext, RuntimeData};
11
12use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
13use crate::deploy_runtime::HydroMeta;
14
15pub struct DeployRuntime {}
16
17impl<'a> Deploy<'a> for DeployRuntime {
18 type InstantiateEnv = ();
19 type CompileEnv = RuntimeData<&'a DeployPorts<HydroMeta>>;
20 type Process = DeployRuntimeNode;
21 type Cluster = DeployRuntimeCluster;
22 type ExternalProcess = DeployRuntimeNode;
23 type Port = String;
24 type ExternalRawPort = ();
25 type Meta = ();
26 type GraphId = usize;
27
28 fn has_trivial_node() -> bool {
29 true
30 }
31
32 fn trivial_process(_id: usize) -> Self::Process {
33 DeployRuntimeNode {
34 next_port: Rc::new(RefCell::new(0)),
35 }
36 }
37
38 fn trivial_cluster(_id: usize) -> Self::Cluster {
39 DeployRuntimeCluster {
40 next_port: Rc::new(RefCell::new(0)),
41 }
42 }
43
44 fn allocate_process_port(process: &Self::Process) -> Self::Port {
45 process.next_port()
46 }
47
48 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
49 cluster.next_port()
50 }
51
52 fn allocate_external_port(_external: &Self::ExternalProcess) -> Self::Port {
53 panic!();
54 }
55
56 fn o2o_sink_source(
57 env: &Self::CompileEnv,
58 _p1: &Self::Process,
59 p1_port: &Self::Port,
60 _p2: &Self::Process,
61 p2_port: &Self::Port,
62 ) -> (syn::Expr, syn::Expr) {
63 crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str())
64 }
65
66 fn o2o_connect(
67 _p1: &Self::Process,
68 _p1_port: &Self::Port,
69 _p2: &Self::Process,
70 _p2_port: &Self::Port,
71 ) -> Box<dyn FnOnce()> {
72 Box::new(|| panic!())
73 }
74
75 fn o2m_sink_source(
76 env: &Self::CompileEnv,
77 _p1: &Self::Process,
78 p1_port: &Self::Port,
79 _c2: &Self::Cluster,
80 c2_port: &Self::Port,
81 ) -> (syn::Expr, syn::Expr) {
82 crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str())
83 }
84
85 fn o2m_connect(
86 _p1: &Self::Process,
87 _p1_port: &Self::Port,
88 _c2: &Self::Cluster,
89 _c2_port: &Self::Port,
90 ) -> Box<dyn FnOnce()> {
91 Box::new(|| panic!())
92 }
93
94 fn m2o_sink_source(
95 env: &Self::CompileEnv,
96 _c1: &Self::Cluster,
97 c1_port: &Self::Port,
98 _p2: &Self::Process,
99 p2_port: &Self::Port,
100 ) -> (syn::Expr, syn::Expr) {
101 crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str())
102 }
103
104 fn m2o_connect(
105 _c1: &Self::Cluster,
106 _c1_port: &Self::Port,
107 _p2: &Self::Process,
108 _p2_port: &Self::Port,
109 ) -> Box<dyn FnOnce()> {
110 Box::new(|| panic!())
111 }
112
113 fn m2m_sink_source(
114 env: &Self::CompileEnv,
115 _c1: &Self::Cluster,
116 c1_port: &Self::Port,
117 _c2: &Self::Cluster,
118 c2_port: &Self::Port,
119 ) -> (syn::Expr, syn::Expr) {
120 crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str())
121 }
122
123 fn m2m_connect(
124 _c1: &Self::Cluster,
125 _c1_port: &Self::Port,
126 _c2: &Self::Cluster,
127 _c2_port: &Self::Port,
128 ) -> Box<dyn FnOnce()> {
129 Box::new(|| panic!())
130 }
131
132 fn e2o_source(
133 _compile_env: &Self::CompileEnv,
134 _p1: &Self::ExternalProcess,
135 _p1_port: &Self::Port,
136 _p2: &Self::Process,
137 _p2_port: &Self::Port,
138 ) -> syn::Expr {
139 panic!()
140 }
141
142 fn e2o_connect(
143 _p1: &Self::ExternalProcess,
144 _p1_port: &Self::Port,
145 _p2: &Self::Process,
146 _p2_port: &Self::Port,
147 ) -> Box<dyn FnOnce()> {
148 panic!()
149 }
150
151 fn o2e_sink(
152 _compile_env: &Self::CompileEnv,
153 _p1: &Self::Process,
154 _p1_port: &Self::Port,
155 _p2: &Self::ExternalProcess,
156 _p2_port: &Self::Port,
157 ) -> syn::Expr {
158 panic!()
159 }
160
161 fn o2e_connect(
162 _p1: &Self::Process,
163 _p1_port: &Self::Port,
164 _p2: &Self::ExternalProcess,
165 _p2_port: &Self::Port,
166 ) -> Box<dyn FnOnce()> {
167 panic!()
168 }
169
170 fn cluster_ids(
171 env: &Self::CompileEnv,
172 of_cluster: usize,
173 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
174 crate::deploy_runtime::cluster_members(*env, of_cluster)
175 }
176
177 fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
178 crate::deploy_runtime::cluster_self_id(*env)
179 }
180}
181
182#[derive(Clone)]
183pub struct DeployRuntimeNode {
184 next_port: Rc<RefCell<usize>>,
185}
186
187impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
188 fn register(&self, _key: usize, _port: <DeployRuntime as Deploy>::Port) {
189 panic!()
190 }
191
192 fn raw_port(&self, _key: usize) -> <DeployRuntime as Deploy>::ExternalRawPort {
193 panic!()
194 }
195
196 #[expect(
197 clippy::manual_async_fn,
198 reason = "buggy Clippy lint for lifetime bounds"
199 )]
200 fn as_bytes_sink(
201 &self,
202 _key: usize,
203 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>> + 'a {
204 async { panic!() }
205 }
206
207 #[expect(
208 clippy::manual_async_fn,
209 reason = "buggy Clippy lint for lifetime bounds"
210 )]
211 fn as_bincode_sink<T>(
212 &self,
213 _key: usize,
214 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
215 where
216 T: serde::Serialize + 'static,
217 {
218 async { panic!() }
219 }
220
221 #[expect(
222 clippy::manual_async_fn,
223 reason = "buggy Clippy lint for lifetime bounds"
224 )]
225 fn as_bytes_source(
226 &self,
227 _key: usize,
228 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
229 async { panic!() }
230 }
231
232 #[expect(
233 clippy::manual_async_fn,
234 reason = "buggy Clippy lint for lifetime bounds"
235 )]
236 fn as_bincode_source<T>(
237 &self,
238 _key: usize,
239 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
240 where
241 T: serde::de::DeserializeOwned + 'static,
242 {
243 async { panic!() }
244 }
245}
246
247impl Node for DeployRuntimeNode {
248 type Port = String;
249 type Meta = ();
250 type InstantiateEnv = ();
251
252 fn next_port(&self) -> String {
253 let next_send_port = *self.next_port.borrow();
254 *self.next_port.borrow_mut() += 1;
255 format!("port_{}", next_send_port)
256 }
257
258 fn update_meta(&mut self, _meta: &Self::Meta) {}
259
260 fn instantiate(
261 &self,
262 _env: &mut Self::InstantiateEnv,
263 _meta: &mut Self::Meta,
264 _graph: DfirGraph,
265 _extra_stmts: Vec<syn::Stmt>,
266 ) {
267 panic!(".deploy() cannot be called on a DeployRuntimeNode");
268 }
269}
270
271#[derive(Clone)]
272pub struct DeployRuntimeCluster {
273 next_port: Rc<RefCell<usize>>,
274}
275
276impl Node for DeployRuntimeCluster {
277 type Port = String;
278 type Meta = ();
279 type InstantiateEnv = ();
280
281 fn next_port(&self) -> String {
282 let next_send_port = *self.next_port.borrow();
283 *self.next_port.borrow_mut() += 1;
284 format!("port_{}", next_send_port)
285 }
286
287 fn update_meta(&mut self, _meta: &Self::Meta) {}
288
289 fn instantiate(
290 &self,
291 _env: &mut Self::InstantiateEnv,
292 _meta: &mut Self::Meta,
293 _graph: DfirGraph,
294 _extra_stmts: Vec<syn::Stmt>,
295 ) {
296 panic!(".deploy() cannot be called on a DeployRuntimeCluster");
297 }
298}
299
300impl ProcessSpec<'_, DeployRuntime> for () {
301 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
302 DeployRuntimeNode {
303 next_port: Rc::new(RefCell::new(0)),
304 }
305 }
306}
307
308impl ClusterSpec<'_, DeployRuntime> for () {
309 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeCluster {
310 DeployRuntimeCluster {
311 next_port: Rc::new(RefCell::new(0)),
312 }
313 }
314}
315
316impl ExternalSpec<'_, DeployRuntime> for () {
317 fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
318 panic!()
319 }
320}