hydro_lang/deploy/
macro_runtime.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use bytes::Bytes;
7use dfir_lang::graph::DfirGraph;
8use futures::{Sink, Stream};
9use hydro_deploy_integration::DeployPorts;
10use stageleft::{QuotedWithContext, RuntimeData};
11
12use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
13use crate::deploy_runtime::HydroMeta;
14
15pub struct DeployRuntime {}
16
17impl<'a> Deploy<'a> for DeployRuntime {
18    type InstantiateEnv = ();
19    type CompileEnv = RuntimeData<&'a DeployPorts<HydroMeta>>;
20    type Process = DeployRuntimeNode;
21    type Cluster = DeployRuntimeCluster;
22    type ExternalProcess = DeployRuntimeNode;
23    type Port = String;
24    type ExternalRawPort = ();
25    type Meta = ();
26    type GraphId = usize;
27
28    fn has_trivial_node() -> bool {
29        true
30    }
31
32    fn trivial_process(_id: usize) -> Self::Process {
33        DeployRuntimeNode {
34            next_port: Rc::new(RefCell::new(0)),
35        }
36    }
37
38    fn trivial_cluster(_id: usize) -> Self::Cluster {
39        DeployRuntimeCluster {
40            next_port: Rc::new(RefCell::new(0)),
41        }
42    }
43
44    fn allocate_process_port(process: &Self::Process) -> Self::Port {
45        process.next_port()
46    }
47
48    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
49        cluster.next_port()
50    }
51
52    fn allocate_external_port(_external: &Self::ExternalProcess) -> Self::Port {
53        panic!();
54    }
55
56    fn o2o_sink_source(
57        env: &Self::CompileEnv,
58        _p1: &Self::Process,
59        p1_port: &Self::Port,
60        _p2: &Self::Process,
61        p2_port: &Self::Port,
62    ) -> (syn::Expr, syn::Expr) {
63        crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str())
64    }
65
66    fn o2o_connect(
67        _p1: &Self::Process,
68        _p1_port: &Self::Port,
69        _p2: &Self::Process,
70        _p2_port: &Self::Port,
71    ) -> Box<dyn FnOnce()> {
72        Box::new(|| panic!())
73    }
74
75    fn o2m_sink_source(
76        env: &Self::CompileEnv,
77        _p1: &Self::Process,
78        p1_port: &Self::Port,
79        _c2: &Self::Cluster,
80        c2_port: &Self::Port,
81    ) -> (syn::Expr, syn::Expr) {
82        crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str())
83    }
84
85    fn o2m_connect(
86        _p1: &Self::Process,
87        _p1_port: &Self::Port,
88        _c2: &Self::Cluster,
89        _c2_port: &Self::Port,
90    ) -> Box<dyn FnOnce()> {
91        Box::new(|| panic!())
92    }
93
94    fn m2o_sink_source(
95        env: &Self::CompileEnv,
96        _c1: &Self::Cluster,
97        c1_port: &Self::Port,
98        _p2: &Self::Process,
99        p2_port: &Self::Port,
100    ) -> (syn::Expr, syn::Expr) {
101        crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str())
102    }
103
104    fn m2o_connect(
105        _c1: &Self::Cluster,
106        _c1_port: &Self::Port,
107        _p2: &Self::Process,
108        _p2_port: &Self::Port,
109    ) -> Box<dyn FnOnce()> {
110        Box::new(|| panic!())
111    }
112
113    fn m2m_sink_source(
114        env: &Self::CompileEnv,
115        _c1: &Self::Cluster,
116        c1_port: &Self::Port,
117        _c2: &Self::Cluster,
118        c2_port: &Self::Port,
119    ) -> (syn::Expr, syn::Expr) {
120        crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str())
121    }
122
123    fn m2m_connect(
124        _c1: &Self::Cluster,
125        _c1_port: &Self::Port,
126        _c2: &Self::Cluster,
127        _c2_port: &Self::Port,
128    ) -> Box<dyn FnOnce()> {
129        Box::new(|| panic!())
130    }
131
132    fn e2o_source(
133        _compile_env: &Self::CompileEnv,
134        _p1: &Self::ExternalProcess,
135        _p1_port: &Self::Port,
136        _p2: &Self::Process,
137        _p2_port: &Self::Port,
138    ) -> syn::Expr {
139        panic!()
140    }
141
142    fn e2o_connect(
143        _p1: &Self::ExternalProcess,
144        _p1_port: &Self::Port,
145        _p2: &Self::Process,
146        _p2_port: &Self::Port,
147    ) -> Box<dyn FnOnce()> {
148        panic!()
149    }
150
151    fn o2e_sink(
152        _compile_env: &Self::CompileEnv,
153        _p1: &Self::Process,
154        _p1_port: &Self::Port,
155        _p2: &Self::ExternalProcess,
156        _p2_port: &Self::Port,
157    ) -> syn::Expr {
158        panic!()
159    }
160
161    fn o2e_connect(
162        _p1: &Self::Process,
163        _p1_port: &Self::Port,
164        _p2: &Self::ExternalProcess,
165        _p2_port: &Self::Port,
166    ) -> Box<dyn FnOnce()> {
167        panic!()
168    }
169
170    fn cluster_ids(
171        env: &Self::CompileEnv,
172        of_cluster: usize,
173    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
174        crate::deploy_runtime::cluster_members(*env, of_cluster)
175    }
176
177    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
178        crate::deploy_runtime::cluster_self_id(*env)
179    }
180}
181
182#[derive(Clone)]
183pub struct DeployRuntimeNode {
184    next_port: Rc<RefCell<usize>>,
185}
186
187impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
188    fn register(&self, _key: usize, _port: <DeployRuntime as Deploy>::Port) {
189        panic!()
190    }
191
192    fn raw_port(&self, _key: usize) -> <DeployRuntime as Deploy>::ExternalRawPort {
193        panic!()
194    }
195
196    #[expect(
197        clippy::manual_async_fn,
198        reason = "buggy Clippy lint for lifetime bounds"
199    )]
200    fn as_bytes_sink(
201        &self,
202        _key: usize,
203    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>> + 'a {
204        async { panic!() }
205    }
206
207    #[expect(
208        clippy::manual_async_fn,
209        reason = "buggy Clippy lint for lifetime bounds"
210    )]
211    fn as_bincode_sink<T>(
212        &self,
213        _key: usize,
214    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
215    where
216        T: serde::Serialize + 'static,
217    {
218        async { panic!() }
219    }
220
221    #[expect(
222        clippy::manual_async_fn,
223        reason = "buggy Clippy lint for lifetime bounds"
224    )]
225    fn as_bytes_source(
226        &self,
227        _key: usize,
228    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
229        async { panic!() }
230    }
231
232    #[expect(
233        clippy::manual_async_fn,
234        reason = "buggy Clippy lint for lifetime bounds"
235    )]
236    fn as_bincode_source<T>(
237        &self,
238        _key: usize,
239    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
240    where
241        T: serde::de::DeserializeOwned + 'static,
242    {
243        async { panic!() }
244    }
245}
246
247impl Node for DeployRuntimeNode {
248    type Port = String;
249    type Meta = ();
250    type InstantiateEnv = ();
251
252    fn next_port(&self) -> String {
253        let next_send_port = *self.next_port.borrow();
254        *self.next_port.borrow_mut() += 1;
255        format!("port_{}", next_send_port)
256    }
257
258    fn update_meta(&mut self, _meta: &Self::Meta) {}
259
260    fn instantiate(
261        &self,
262        _env: &mut Self::InstantiateEnv,
263        _meta: &mut Self::Meta,
264        _graph: DfirGraph,
265        _extra_stmts: Vec<syn::Stmt>,
266    ) {
267        panic!(".deploy() cannot be called on a DeployRuntimeNode");
268    }
269}
270
271#[derive(Clone)]
272pub struct DeployRuntimeCluster {
273    next_port: Rc<RefCell<usize>>,
274}
275
276impl Node for DeployRuntimeCluster {
277    type Port = String;
278    type Meta = ();
279    type InstantiateEnv = ();
280
281    fn next_port(&self) -> String {
282        let next_send_port = *self.next_port.borrow();
283        *self.next_port.borrow_mut() += 1;
284        format!("port_{}", next_send_port)
285    }
286
287    fn update_meta(&mut self, _meta: &Self::Meta) {}
288
289    fn instantiate(
290        &self,
291        _env: &mut Self::InstantiateEnv,
292        _meta: &mut Self::Meta,
293        _graph: DfirGraph,
294        _extra_stmts: Vec<syn::Stmt>,
295    ) {
296        panic!(".deploy() cannot be called on a DeployRuntimeCluster");
297    }
298}
299
300impl ProcessSpec<'_, DeployRuntime> for () {
301    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
302        DeployRuntimeNode {
303            next_port: Rc::new(RefCell::new(0)),
304        }
305    }
306}
307
308impl ClusterSpec<'_, DeployRuntime> for () {
309    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeCluster {
310        DeployRuntimeCluster {
311            next_port: Rc::new(RefCell::new(0)),
312        }
313    }
314}
315
316impl ExternalSpec<'_, DeployRuntime> for () {
317    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
318        panic!()
319    }
320}