hydro_lang/deploy/
macro_runtime.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use dfir_lang::graph::DfirGraph;
7use dfir_rs::bytes::Bytes;
8use dfir_rs::futures::{Sink, Stream};
9use dfir_rs::util::deploy::DeployPorts;
10use stageleft::{QuotedWithContext, RuntimeData};
11
12use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
13use crate::deploy_runtime::HydroMeta;
14
15pub struct DeployRuntime {}
16
17impl<'a> Deploy<'a> for DeployRuntime {
18    type InstantiateEnv = ();
19    type CompileEnv = RuntimeData<&'a DeployPorts<HydroMeta>>;
20    type Process = DeployRuntimeNode;
21    type Cluster = DeployRuntimeCluster;
22    type ExternalProcess = DeployRuntimeNode;
23    type Port = String;
24    type ExternalRawPort = ();
25    type Meta = ();
26    type GraphId = usize;
27
28    fn has_trivial_node() -> bool {
29        true
30    }
31
32    fn trivial_process(_id: usize) -> Self::Process {
33        DeployRuntimeNode {
34            next_port: Rc::new(RefCell::new(0)),
35        }
36    }
37
38    fn trivial_cluster(_id: usize) -> Self::Cluster {
39        DeployRuntimeCluster {
40            next_port: Rc::new(RefCell::new(0)),
41        }
42    }
43
44    fn allocate_process_port(process: &Self::Process) -> Self::Port {
45        process.next_port()
46    }
47
48    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
49        cluster.next_port()
50    }
51
52    fn allocate_external_port(_external: &Self::ExternalProcess) -> Self::Port {
53        panic!();
54    }
55
56    fn o2o_sink_source(
57        env: &Self::CompileEnv,
58        _p1: &Self::Process,
59        p1_port: &Self::Port,
60        _p2: &Self::Process,
61        p2_port: &Self::Port,
62    ) -> (syn::Expr, syn::Expr) {
63        crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str())
64    }
65
66    fn o2o_connect(
67        _p1: &Self::Process,
68        _p1_port: &Self::Port,
69        _p2: &Self::Process,
70        _p2_port: &Self::Port,
71    ) -> Box<dyn FnOnce()> {
72        Box::new(|| panic!())
73    }
74
75    fn o2m_sink_source(
76        env: &Self::CompileEnv,
77        _p1: &Self::Process,
78        p1_port: &Self::Port,
79        _c2: &Self::Cluster,
80        c2_port: &Self::Port,
81    ) -> (syn::Expr, syn::Expr) {
82        crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str())
83    }
84
85    fn o2m_connect(
86        _p1: &Self::Process,
87        _p1_port: &Self::Port,
88        _c2: &Self::Cluster,
89        _c2_port: &Self::Port,
90    ) -> Box<dyn FnOnce()> {
91        Box::new(|| panic!())
92    }
93
94    fn m2o_sink_source(
95        env: &Self::CompileEnv,
96        _c1: &Self::Cluster,
97        c1_port: &Self::Port,
98        _p2: &Self::Process,
99        p2_port: &Self::Port,
100    ) -> (syn::Expr, syn::Expr) {
101        crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str())
102    }
103
104    fn m2o_connect(
105        _c1: &Self::Cluster,
106        _c1_port: &Self::Port,
107        _p2: &Self::Process,
108        _p2_port: &Self::Port,
109    ) -> Box<dyn FnOnce()> {
110        Box::new(|| panic!())
111    }
112
113    fn m2m_sink_source(
114        env: &Self::CompileEnv,
115        _c1: &Self::Cluster,
116        c1_port: &Self::Port,
117        _c2: &Self::Cluster,
118        c2_port: &Self::Port,
119    ) -> (syn::Expr, syn::Expr) {
120        crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str())
121    }
122
123    fn m2m_connect(
124        _c1: &Self::Cluster,
125        _c1_port: &Self::Port,
126        _c2: &Self::Cluster,
127        _c2_port: &Self::Port,
128    ) -> Box<dyn FnOnce()> {
129        Box::new(|| panic!())
130    }
131
132    fn e2o_source(
133        _compile_env: &Self::CompileEnv,
134        _p1: &Self::ExternalProcess,
135        _p1_port: &Self::Port,
136        _p2: &Self::Process,
137        _p2_port: &Self::Port,
138    ) -> syn::Expr {
139        panic!()
140    }
141
142    fn e2o_connect(
143        _p1: &Self::ExternalProcess,
144        _p1_port: &Self::Port,
145        _p2: &Self::Process,
146        _p2_port: &Self::Port,
147    ) -> Box<dyn FnOnce()> {
148        panic!()
149    }
150
151    fn o2e_sink(
152        _compile_env: &Self::CompileEnv,
153        _p1: &Self::Process,
154        _p1_port: &Self::Port,
155        _p2: &Self::ExternalProcess,
156        _p2_port: &Self::Port,
157    ) -> syn::Expr {
158        panic!()
159    }
160
161    fn o2e_connect(
162        _p1: &Self::Process,
163        _p1_port: &Self::Port,
164        _p2: &Self::ExternalProcess,
165        _p2_port: &Self::Port,
166    ) -> Box<dyn FnOnce()> {
167        panic!()
168    }
169
170    fn cluster_ids(
171        env: &Self::CompileEnv,
172        of_cluster: usize,
173    ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
174        crate::deploy_runtime::cluster_members(*env, of_cluster)
175    }
176
177    fn cluster_self_id(env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
178        crate::deploy_runtime::cluster_self_id(*env)
179    }
180}
181
182#[derive(Clone)]
183pub struct DeployRuntimeNode {
184    next_port: Rc<RefCell<usize>>,
185}
186
187impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode {
188    fn register(&self, _key: usize, _port: <DeployRuntime as Deploy>::Port) {
189        panic!()
190    }
191
192    fn raw_port(&self, _key: usize) -> <DeployRuntime as Deploy>::ExternalRawPort {
193        panic!()
194    }
195
196    #[expect(
197        clippy::manual_async_fn,
198        reason = "buggy Clippy lint for lifetime bounds"
199    )]
200    fn as_bytes_sink(
201        &self,
202        _key: usize,
203    ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>> + 'a {
204        async { panic!() }
205    }
206
207    #[expect(
208        clippy::manual_async_fn,
209        reason = "buggy Clippy lint for lifetime bounds"
210    )]
211    fn as_bincode_sink<T: serde::Serialize + 'static>(
212        &self,
213        _key: usize,
214    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a {
215        async { panic!() }
216    }
217
218    #[expect(
219        clippy::manual_async_fn,
220        reason = "buggy Clippy lint for lifetime bounds"
221    )]
222    fn as_bytes_source(
223        &self,
224        _key: usize,
225    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
226        async { panic!() }
227    }
228
229    #[expect(
230        clippy::manual_async_fn,
231        reason = "buggy Clippy lint for lifetime bounds"
232    )]
233    fn as_bincode_source<T: serde::de::DeserializeOwned + 'static>(
234        &self,
235        _key: usize,
236    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
237        async { panic!() }
238    }
239}
240
241impl Node for DeployRuntimeNode {
242    type Port = String;
243    type Meta = ();
244    type InstantiateEnv = ();
245
246    fn next_port(&self) -> String {
247        let next_send_port = *self.next_port.borrow();
248        *self.next_port.borrow_mut() += 1;
249        format!("port_{}", next_send_port)
250    }
251
252    fn update_meta(&mut self, _meta: &Self::Meta) {}
253
254    fn instantiate(
255        &self,
256        _env: &mut Self::InstantiateEnv,
257        _meta: &mut Self::Meta,
258        _graph: DfirGraph,
259        _extra_stmts: Vec<syn::Stmt>,
260    ) {
261        panic!(".deploy() cannot be called on a DeployRuntimeNode");
262    }
263}
264
265#[derive(Clone)]
266pub struct DeployRuntimeCluster {
267    next_port: Rc<RefCell<usize>>,
268}
269
270impl Node for DeployRuntimeCluster {
271    type Port = String;
272    type Meta = ();
273    type InstantiateEnv = ();
274
275    fn next_port(&self) -> String {
276        let next_send_port = *self.next_port.borrow();
277        *self.next_port.borrow_mut() += 1;
278        format!("port_{}", next_send_port)
279    }
280
281    fn update_meta(&mut self, _meta: &Self::Meta) {}
282
283    fn instantiate(
284        &self,
285        _env: &mut Self::InstantiateEnv,
286        _meta: &mut Self::Meta,
287        _graph: DfirGraph,
288        _extra_stmts: Vec<syn::Stmt>,
289    ) {
290        panic!(".deploy() cannot be called on a DeployRuntimeCluster");
291    }
292}
293
294impl ProcessSpec<'_, DeployRuntime> for () {
295    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
296        DeployRuntimeNode {
297            next_port: Rc::new(RefCell::new(0)),
298        }
299    }
300}
301
302impl ClusterSpec<'_, DeployRuntime> for () {
303    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeCluster {
304        DeployRuntimeCluster {
305            next_port: Rc::new(RefCell::new(0)),
306        }
307    }
308}
309
310impl ExternalSpec<'_, DeployRuntime> for () {
311    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
312        panic!()
313    }
314}