1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::io::Error;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use dfir_lang::graph::DfirGraph;
10use dfir_rs::bytes::Bytes;
11use dfir_rs::futures::{Sink, SinkExt, Stream, StreamExt};
12use dfir_rs::util::deploy::{ConnectedSink, ConnectedSource};
13use hydro_deploy::custom_service::CustomClientPort;
14use hydro_deploy::rust_crate::RustCrateService;
15use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
16use hydro_deploy::rust_crate::tracing_options::TracingOptions;
17use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
18use nameof::name_of;
19use serde::Serialize;
20use serde::de::DeserializeOwned;
21use stageleft::{QuotedWithContext, RuntimeData};
22use tokio::sync::RwLock;
23
24use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
25use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
26use crate::deploy_runtime::*;
27
28pub struct HydroDeploy {}
29
30impl<'a> Deploy<'a> for HydroDeploy {
31 type InstantiateEnv = Deployment;
32 type CompileEnv = ();
33 type Process = DeployNode;
34 type Cluster = DeployCluster;
35 type ExternalProcess = DeployExternal;
36 type Meta = HashMap<usize, Vec<u32>>;
37 type GraphId = ();
38 type Port = String;
39 type ExternalRawPort = CustomClientPort;
40
41 fn allocate_process_port(process: &Self::Process) -> Self::Port {
42 process.next_port()
43 }
44
45 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
46 cluster.next_port()
47 }
48
49 fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port {
50 external.next_port()
51 }
52
53 fn o2o_sink_source(
54 _env: &(),
55 _p1: &Self::Process,
56 p1_port: &Self::Port,
57 _p2: &Self::Process,
58 p2_port: &Self::Port,
59 ) -> (syn::Expr, syn::Expr) {
60 let p1_port = p1_port.as_str();
61 let p2_port = p2_port.as_str();
62 deploy_o2o(
63 RuntimeData::new("__hydro_lang_trybuild_cli"),
64 p1_port,
65 p2_port,
66 )
67 }
68
69 fn o2o_connect(
70 p1: &Self::Process,
71 p1_port: &Self::Port,
72 p2: &Self::Process,
73 p2_port: &Self::Port,
74 ) -> Box<dyn FnOnce()> {
75 let p1 = p1.clone();
76 let p1_port = p1_port.clone();
77 let p2 = p2.clone();
78 let p2_port = p2_port.clone();
79
80 Box::new(move || {
81 let self_underlying_borrow = p1.underlying.borrow();
82 let self_underlying = self_underlying_borrow.as_ref().unwrap();
83 let source_port = self_underlying
84 .try_read()
85 .unwrap()
86 .get_port(p1_port.clone(), self_underlying);
87
88 let other_underlying_borrow = p2.underlying.borrow();
89 let other_underlying = other_underlying_borrow.as_ref().unwrap();
90 let recipient_port = other_underlying
91 .try_read()
92 .unwrap()
93 .get_port(p2_port.clone(), other_underlying);
94
95 source_port.send_to(&recipient_port)
96 })
97 }
98
99 fn o2m_sink_source(
100 _env: &(),
101 _p1: &Self::Process,
102 p1_port: &Self::Port,
103 _c2: &Self::Cluster,
104 c2_port: &Self::Port,
105 ) -> (syn::Expr, syn::Expr) {
106 let p1_port = p1_port.as_str();
107 let c2_port = c2_port.as_str();
108 deploy_o2m(
109 RuntimeData::new("__hydro_lang_trybuild_cli"),
110 p1_port,
111 c2_port,
112 )
113 }
114
115 fn o2m_connect(
116 p1: &Self::Process,
117 p1_port: &Self::Port,
118 c2: &Self::Cluster,
119 c2_port: &Self::Port,
120 ) -> Box<dyn FnOnce()> {
121 let p1 = p1.clone();
122 let p1_port = p1_port.clone();
123 let c2 = c2.clone();
124 let c2_port = c2_port.clone();
125
126 Box::new(move || {
127 let self_underlying_borrow = p1.underlying.borrow();
128 let self_underlying = self_underlying_borrow.as_ref().unwrap();
129 let source_port = self_underlying
130 .try_read()
131 .unwrap()
132 .get_port(p1_port.clone(), self_underlying);
133
134 let recipient_port = DemuxSink {
135 demux: c2
136 .members
137 .borrow()
138 .iter()
139 .enumerate()
140 .map(|(id, c)| {
141 let n = c.underlying.try_read().unwrap();
142 (
143 id as u32,
144 Arc::new(n.get_port(c2_port.clone(), &c.underlying))
145 as Arc<dyn RustCrateSink + 'static>,
146 )
147 })
148 .collect(),
149 };
150
151 source_port.send_to(&recipient_port)
152 })
153 }
154
155 fn m2o_sink_source(
156 _env: &(),
157 _c1: &Self::Cluster,
158 c1_port: &Self::Port,
159 _p2: &Self::Process,
160 p2_port: &Self::Port,
161 ) -> (syn::Expr, syn::Expr) {
162 let c1_port = c1_port.as_str();
163 let p2_port = p2_port.as_str();
164 deploy_m2o(
165 RuntimeData::new("__hydro_lang_trybuild_cli"),
166 c1_port,
167 p2_port,
168 )
169 }
170
171 fn m2o_connect(
172 c1: &Self::Cluster,
173 c1_port: &Self::Port,
174 p2: &Self::Process,
175 p2_port: &Self::Port,
176 ) -> Box<dyn FnOnce()> {
177 let c1 = c1.clone();
178 let c1_port = c1_port.clone();
179 let p2 = p2.clone();
180 let p2_port = p2_port.clone();
181
182 Box::new(move || {
183 let other_underlying_borrow = p2.underlying.borrow();
184 let other_underlying = other_underlying_borrow.as_ref().unwrap();
185 let recipient_port = other_underlying
186 .try_read()
187 .unwrap()
188 .get_port(p2_port.clone(), other_underlying)
189 .merge();
190
191 for (i, node) in c1.members.borrow().iter().enumerate() {
192 let source_port = node
193 .underlying
194 .try_read()
195 .unwrap()
196 .get_port(c1_port.clone(), &node.underlying);
197
198 TaggedSource {
199 source: Arc::new(source_port),
200 tag: i as u32,
201 }
202 .send_to(&recipient_port);
203 }
204 })
205 }
206
207 fn m2m_sink_source(
208 _env: &(),
209 _c1: &Self::Cluster,
210 c1_port: &Self::Port,
211 _c2: &Self::Cluster,
212 c2_port: &Self::Port,
213 ) -> (syn::Expr, syn::Expr) {
214 let c1_port = c1_port.as_str();
215 let c2_port = c2_port.as_str();
216 deploy_m2m(
217 RuntimeData::new("__hydro_lang_trybuild_cli"),
218 c1_port,
219 c2_port,
220 )
221 }
222
223 fn m2m_connect(
224 c1: &Self::Cluster,
225 c1_port: &Self::Port,
226 c2: &Self::Cluster,
227 c2_port: &Self::Port,
228 ) -> Box<dyn FnOnce()> {
229 let c1 = c1.clone();
230 let c1_port = c1_port.clone();
231 let c2 = c2.clone();
232 let c2_port = c2_port.clone();
233
234 Box::new(move || {
235 for (i, sender) in c1.members.borrow().iter().enumerate() {
236 let source_port = sender
237 .underlying
238 .try_read()
239 .unwrap()
240 .get_port(c1_port.clone(), &sender.underlying);
241
242 let recipient_port = DemuxSink {
243 demux: c2
244 .members
245 .borrow()
246 .iter()
247 .enumerate()
248 .map(|(id, c)| {
249 let n = c.underlying.try_read().unwrap();
250 (
251 id as u32,
252 Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
253 as Arc<dyn RustCrateSink + 'static>,
254 )
255 })
256 .collect(),
257 };
258
259 TaggedSource {
260 source: Arc::new(source_port),
261 tag: i as u32,
262 }
263 .send_to(&recipient_port);
264 }
265 })
266 }
267
268 fn e2o_source(
269 _compile_env: &Self::CompileEnv,
270 _p1: &Self::ExternalProcess,
271 p1_port: &Self::Port,
272 _p2: &Self::Process,
273 p2_port: &Self::Port,
274 ) -> syn::Expr {
275 let p1_port = p1_port.as_str();
276 let p2_port = p2_port.as_str();
277 deploy_e2o(
278 RuntimeData::new("__hydro_lang_trybuild_cli"),
279 p1_port,
280 p2_port,
281 )
282 }
283
284 fn e2o_connect(
285 p1: &Self::ExternalProcess,
286 p1_port: &Self::Port,
287 p2: &Self::Process,
288 p2_port: &Self::Port,
289 ) -> Box<dyn FnOnce()> {
290 let p1 = p1.clone();
291 let p1_port = p1_port.clone();
292 let p2 = p2.clone();
293 let p2_port = p2_port.clone();
294
295 Box::new(move || {
296 let self_underlying_borrow = p1.underlying.borrow();
297 let self_underlying = self_underlying_borrow.as_ref().unwrap();
298 let source_port = self_underlying
299 .try_read()
300 .unwrap()
301 .declare_client(self_underlying);
302
303 let other_underlying_borrow = p2.underlying.borrow();
304 let other_underlying = other_underlying_borrow.as_ref().unwrap();
305 let recipient_port = other_underlying
306 .try_read()
307 .unwrap()
308 .get_port(p2_port.clone(), other_underlying);
309
310 source_port.send_to(&recipient_port);
311
312 p1.client_ports
313 .borrow_mut()
314 .insert(p1_port.clone(), source_port);
315 })
316 }
317
318 fn o2e_sink(
319 _compile_env: &Self::CompileEnv,
320 _p1: &Self::Process,
321 p1_port: &Self::Port,
322 _p2: &Self::ExternalProcess,
323 p2_port: &Self::Port,
324 ) -> syn::Expr {
325 let p1_port = p1_port.as_str();
326 let p2_port = p2_port.as_str();
327 deploy_o2e(
328 RuntimeData::new("__hydro_lang_trybuild_cli"),
329 p1_port,
330 p2_port,
331 )
332 }
333
334 fn o2e_connect(
335 p1: &Self::Process,
336 p1_port: &Self::Port,
337 p2: &Self::ExternalProcess,
338 p2_port: &Self::Port,
339 ) -> Box<dyn FnOnce()> {
340 let p1 = p1.clone();
341 let p1_port = p1_port.clone();
342 let p2 = p2.clone();
343 let p2_port = p2_port.clone();
344
345 Box::new(move || {
346 let self_underlying_borrow = p1.underlying.borrow();
347 let self_underlying = self_underlying_borrow.as_ref().unwrap();
348 let source_port = self_underlying
349 .try_read()
350 .unwrap()
351 .get_port(p1_port.clone(), self_underlying);
352
353 let other_underlying_borrow = p2.underlying.borrow();
354 let other_underlying = other_underlying_borrow.as_ref().unwrap();
355 let recipient_port = other_underlying
356 .try_read()
357 .unwrap()
358 .declare_client(other_underlying);
359
360 source_port.send_to(&recipient_port);
361
362 p2.client_ports
363 .borrow_mut()
364 .insert(p2_port.clone(), recipient_port);
365 })
366 }
367
368 fn cluster_ids(
369 _env: &Self::CompileEnv,
370 of_cluster: usize,
371 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
372 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
373 }
374
375 fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
376 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
377 }
378}
379
380pub trait DeployCrateWrapper {
381 fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
382
383 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
384 async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
385 self.underlying().read().await.stdout()
386 }
387
388 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
389 async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
390 self.underlying().read().await.stderr()
391 }
392
393 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
394 async fn stdout_filter(
395 &self,
396 prefix: impl Into<String>,
397 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
398 self.underlying().read().await.stdout_filter(prefix.into())
399 }
400
401 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
402 async fn stderr_filter(
403 &self,
404 prefix: impl Into<String>,
405 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
406 self.underlying().read().await.stderr_filter(prefix.into())
407 }
408
409 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
410 async fn tracing_results(&self) -> Option<TracingResults> {
411 self.underlying().read().await.tracing_results().cloned()
412 }
413}
414
415#[derive(Clone)]
416pub struct TrybuildHost {
417 pub host: Arc<dyn Host>,
418 pub display_name: Option<String>,
419 pub rustflags: Option<String>,
420 pub additional_hydro_features: Vec<String>,
421 pub tracing: Option<TracingOptions>,
422 pub name_hint: Option<String>,
423 pub cluster_idx: Option<usize>,
424}
425
426impl From<Arc<dyn Host>> for TrybuildHost {
427 fn from(host: Arc<dyn Host>) -> Self {
428 Self {
429 host,
430 display_name: None,
431 rustflags: None,
432 additional_hydro_features: vec![],
433 tracing: None,
434 name_hint: None,
435 cluster_idx: None,
436 }
437 }
438}
439
440impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
441 fn from(host: Arc<H>) -> Self {
442 Self {
443 host,
444 display_name: None,
445 rustflags: None,
446 additional_hydro_features: vec![],
447 tracing: None,
448 name_hint: None,
449 cluster_idx: None,
450 }
451 }
452}
453
454impl TrybuildHost {
455 pub fn new(host: Arc<dyn Host>) -> Self {
456 Self {
457 host,
458 display_name: None,
459 rustflags: None,
460 additional_hydro_features: vec![],
461 tracing: None,
462 name_hint: None,
463 cluster_idx: None,
464 }
465 }
466
467 pub fn display_name(self, display_name: impl Into<String>) -> Self {
468 if self.display_name.is_some() {
469 panic!("{} already set", name_of!(display_name in Self));
470 }
471
472 Self {
473 display_name: Some(display_name.into()),
474 ..self
475 }
476 }
477
478 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
479 if self.rustflags.is_some() {
480 panic!("{} already set", name_of!(rustflags in Self));
481 }
482
483 Self {
484 rustflags: Some(rustflags.into()),
485 ..self
486 }
487 }
488
489 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
490 Self {
491 additional_hydro_features,
492 ..self
493 }
494 }
495
496 pub fn tracing(self, tracing: TracingOptions) -> Self {
497 if self.tracing.is_some() {
498 panic!("{} already set", name_of!(tracing in Self));
499 }
500
501 Self {
502 tracing: Some(tracing),
503 ..self
504 }
505 }
506}
507
508impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
509 type ProcessSpec = TrybuildHost;
510 fn into_process_spec(self) -> TrybuildHost {
511 TrybuildHost {
512 host: self,
513 display_name: None,
514 rustflags: None,
515 additional_hydro_features: vec![],
516 tracing: None,
517 name_hint: None,
518 cluster_idx: None,
519 }
520 }
521}
522
523impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
524 type ProcessSpec = TrybuildHost;
525 fn into_process_spec(self) -> TrybuildHost {
526 TrybuildHost {
527 host: self,
528 display_name: None,
529 rustflags: None,
530 additional_hydro_features: vec![],
531 tracing: None,
532 name_hint: None,
533 cluster_idx: None,
534 }
535 }
536}
537
538#[derive(Clone)]
539pub struct DeployExternal {
540 next_port: Rc<RefCell<usize>>,
541 host: Arc<dyn Host>,
542 underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
543 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
544 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
545}
546
547impl DeployExternal {
548 pub fn take_port(&self, key: usize) -> CustomClientPort {
549 self.client_ports
550 .borrow_mut()
551 .remove(self.allocated_ports.borrow().get(&key).unwrap())
552 .unwrap()
553 }
554}
555
556impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
557 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
558 self.allocated_ports.borrow_mut().insert(key, port);
559 }
560
561 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy>::ExternalRawPort {
562 self.client_ports
563 .borrow_mut()
564 .remove(self.allocated_ports.borrow().get(&key).unwrap())
565 .unwrap()
566 }
567
568 fn as_bytes_sink(
569 &self,
570 key: usize,
571 ) -> impl Future<Output = Pin<Box<dyn Sink<Bytes, Error = Error>>>> + 'a {
572 let port = self.raw_port(key);
573 async move {
574 let sink = port.connect().await.into_sink();
575 sink as Pin<Box<dyn Sink<Bytes, Error = Error>>>
576 }
577 }
578
579 fn as_bincode_sink<T: Serialize + 'static>(
580 &self,
581 key: usize,
582 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
583 let port = self.raw_port(key);
584 async move {
585 let sink = port.connect().await.into_sink();
586 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
587 as Pin<Box<dyn Sink<T, Error = Error>>>
588 }
589 }
590
591 fn as_bytes_source(
592 &self,
593 key: usize,
594 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = Bytes>>>> + 'a {
595 let port = self.raw_port(key);
596 async move {
597 let source = port.connect().await.into_source();
598 Box::pin(source.map(|r| r.unwrap().freeze())) as Pin<Box<dyn Stream<Item = Bytes>>>
599 }
600 }
601
602 fn as_bincode_source<T: DeserializeOwned + 'static>(
603 &self,
604 key: usize,
605 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
606 let port = self.raw_port(key);
607 async move {
608 let source = port.connect().await.into_source();
609 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
610 as Pin<Box<dyn Stream<Item = T>>>
611 }
612 }
613}
614
615impl Node for DeployExternal {
616 type Port = String;
617 type Meta = HashMap<usize, Vec<u32>>;
618 type InstantiateEnv = Deployment;
619
620 fn next_port(&self) -> Self::Port {
621 let next_port = *self.next_port.borrow();
622 *self.next_port.borrow_mut() += 1;
623
624 format!("port_{}", next_port)
625 }
626
627 fn instantiate(
628 &self,
629 env: &mut Self::InstantiateEnv,
630 _meta: &mut Self::Meta,
631 _graph: DfirGraph,
632 _extra_stmts: Vec<syn::Stmt>,
633 ) {
634 let service = env.CustomService(self.host.clone(), vec![]);
635 *self.underlying.borrow_mut() = Some(service);
636 }
637
638 fn update_meta(&mut self, _meta: &Self::Meta) {}
639}
640
641impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
642 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
643 DeployExternal {
644 next_port: Rc::new(RefCell::new(0)),
645 host: self,
646 underlying: Rc::new(RefCell::new(None)),
647 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
648 client_ports: Rc::new(RefCell::new(HashMap::new())),
649 }
650 }
651}
652
653impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
654 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
655 DeployExternal {
656 next_port: Rc::new(RefCell::new(0)),
657 host: self,
658 underlying: Rc::new(RefCell::new(None)),
659 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
660 client_ports: Rc::new(RefCell::new(HashMap::new())),
661 }
662 }
663}
664
665pub enum CrateOrTrybuild {
666 Crate(RustCrate),
667 Trybuild(TrybuildHost),
668}
669
670#[derive(Clone)]
671pub struct DeployNode {
672 id: usize,
673 next_port: Rc<RefCell<usize>>,
674 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
675 underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
676}
677
678impl DeployCrateWrapper for DeployNode {
679 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
680 self.underlying.borrow().as_ref().unwrap().clone()
681 }
682}
683
684impl Node for DeployNode {
685 type Port = String;
686 type Meta = HashMap<usize, Vec<u32>>;
687 type InstantiateEnv = Deployment;
688
689 fn next_port(&self) -> String {
690 let next_port = *self.next_port.borrow();
691 *self.next_port.borrow_mut() += 1;
692
693 format!("port_{}", next_port)
694 }
695
696 fn update_meta(&mut self, meta: &Self::Meta) {
697 let underlying_node = self.underlying.borrow();
698 let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
699 n.update_meta(HydroMeta {
700 clusters: meta.clone(),
701 cluster_id: None,
702 subgraph_id: self.id,
703 });
704 }
705
706 fn instantiate(
707 &self,
708 env: &mut Self::InstantiateEnv,
709 _meta: &mut Self::Meta,
710 graph: DfirGraph,
711 extra_stmts: Vec<syn::Stmt>,
712 ) {
713 let service = match self.service_spec.borrow_mut().take().unwrap() {
714 CrateOrTrybuild::Crate(c) => c,
715 CrateOrTrybuild::Trybuild(trybuild) => {
716 let (bin_name, (dir, target_dir, features)) =
717 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
718 create_trybuild_service(trybuild, &dir, &target_dir, &features, &bin_name)
719 }
720 };
721
722 *self.underlying.borrow_mut() = Some(env.add_service(service));
723 }
724}
725
726#[derive(Clone)]
727pub struct DeployClusterNode {
728 underlying: Arc<RwLock<RustCrateService>>,
729}
730
731impl DeployCrateWrapper for DeployClusterNode {
732 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
733 self.underlying.clone()
734 }
735}
736
737#[derive(Clone)]
738pub struct DeployCluster {
739 id: usize,
740 next_port: Rc<RefCell<usize>>,
741 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
742 members: Rc<RefCell<Vec<DeployClusterNode>>>,
743 name_hint: Option<String>,
744}
745
746impl DeployCluster {
747 pub fn members(&self) -> Vec<DeployClusterNode> {
748 self.members.borrow().clone()
749 }
750}
751
752impl Node for DeployCluster {
753 type Port = String;
754 type Meta = HashMap<usize, Vec<u32>>;
755 type InstantiateEnv = Deployment;
756
757 fn next_port(&self) -> String {
758 let next_port = *self.next_port.borrow();
759 *self.next_port.borrow_mut() += 1;
760
761 format!("port_{}", next_port)
762 }
763
764 fn instantiate(
765 &self,
766 env: &mut Self::InstantiateEnv,
767 meta: &mut Self::Meta,
768 graph: DfirGraph,
769 extra_stmts: Vec<syn::Stmt>,
770 ) {
771 let has_trybuild = self
772 .cluster_spec
773 .borrow()
774 .as_ref()
775 .unwrap()
776 .iter()
777 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
778
779 let maybe_trybuild = if has_trybuild {
780 Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
781 } else {
782 None
783 };
784
785 let cluster_nodes = self
786 .cluster_spec
787 .borrow_mut()
788 .take()
789 .unwrap()
790 .into_iter()
791 .map(|spec| {
792 let service = match spec {
793 CrateOrTrybuild::Crate(c) => c,
794 CrateOrTrybuild::Trybuild(trybuild) => {
795 let (bin_name, (dir, target_dir, features)) =
796 maybe_trybuild.as_ref().unwrap();
797 create_trybuild_service(trybuild, dir, target_dir, features, bin_name)
798 }
799 };
800
801 env.add_service(service)
802 })
803 .collect::<Vec<_>>();
804 meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
805 *self.members.borrow_mut() = cluster_nodes
806 .into_iter()
807 .map(|n| DeployClusterNode { underlying: n })
808 .collect();
809 }
810
811 fn update_meta(&mut self, meta: &Self::Meta) {
812 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
813 let mut n = node.underlying.try_write().unwrap();
814 n.update_meta(HydroMeta {
815 clusters: meta.clone(),
816 cluster_id: Some(cluster_id as u32),
817 subgraph_id: self.id,
818 });
819 }
820 }
821}
822
823#[derive(Clone)]
824pub struct DeployProcessSpec(RustCrate);
825
826impl DeployProcessSpec {
827 pub fn new(t: RustCrate) -> Self {
828 Self(t)
829 }
830}
831
832impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
833 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
834 DeployNode {
835 id,
836 next_port: Rc::new(RefCell::new(0)),
837 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
838 underlying: Rc::new(RefCell::new(None)),
839 }
840 }
841}
842
843impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
844 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
845 self.name_hint = Some(format!("{} (process {id})", name_hint));
846 DeployNode {
847 id,
848 next_port: Rc::new(RefCell::new(0)),
849 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
850 underlying: Rc::new(RefCell::new(None)),
851 }
852 }
853}
854
855#[derive(Clone)]
856pub struct DeployClusterSpec(Vec<RustCrate>);
857
858impl DeployClusterSpec {
859 pub fn new(crates: Vec<RustCrate>) -> Self {
860 Self(crates)
861 }
862}
863
864impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
865 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
866 DeployCluster {
867 id,
868 next_port: Rc::new(RefCell::new(0)),
869 cluster_spec: Rc::new(RefCell::new(Some(
870 self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
871 ))),
872 members: Rc::new(RefCell::new(vec![])),
873 name_hint: None,
874 }
875 }
876}
877
878impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
879 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
880 let name_hint = format!("{} (cluster {id})", name_hint);
881 DeployCluster {
882 id,
883 next_port: Rc::new(RefCell::new(0)),
884 cluster_spec: Rc::new(RefCell::new(Some(
885 self.into_iter()
886 .enumerate()
887 .map(|(idx, b)| {
888 let mut b = b.into();
889 b.name_hint = Some(name_hint.clone());
890 b.cluster_idx = Some(idx);
891 CrateOrTrybuild::Trybuild(b)
892 })
893 .collect(),
894 ))),
895 members: Rc::new(RefCell::new(vec![])),
896 name_hint: Some(name_hint),
897 }
898 }
899}
900
901fn create_trybuild_service(
902 trybuild: TrybuildHost,
903 dir: &std::path::PathBuf,
904 target_dir: &std::path::PathBuf,
905 features: &Option<Vec<String>>,
906 bin_name: &str,
907) -> RustCrate {
908 let mut ret = RustCrate::new(dir, trybuild.host)
909 .target_dir(target_dir)
910 .bin(bin_name)
911 .no_default_features();
912
913 if let Some(display_name) = trybuild.display_name {
914 ret = ret.display_name(display_name);
915 } else if let Some(name_hint) = trybuild.name_hint {
916 if let Some(cluster_idx) = trybuild.cluster_idx {
917 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
918 } else {
919 ret = ret.display_name(name_hint);
920 }
921 }
922
923 if let Some(rustflags) = trybuild.rustflags {
924 ret = ret.rustflags(rustflags);
925 }
926
927 if let Some(tracing) = trybuild.tracing {
928 ret = ret.tracing(tracing);
929 }
930
931 ret = ret.features(
932 trybuild
933 .additional_hydro_features
934 .into_iter()
935 .map(|runtime_feature| {
936 assert!(
937 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
938 "{runtime_feature} is not a valid Hydro runtime feature"
939 );
940 format!("hydro___feature_{runtime_feature}")
941 }),
942 );
943
944 if let Some(features) = features {
945 ret = ret.features(features);
946 }
947
948 ret
949}