1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::io::Error;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use bytes::{Bytes, BytesMut};
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, SinkExt, Stream, StreamExt};
12use hydro_deploy::custom_service::CustomClientPort;
13use hydro_deploy::rust_crate::RustCrateService;
14use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
15use hydro_deploy::rust_crate::tracing_options::TracingOptions;
16use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
17use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
18use nameof::name_of;
19use proc_macro2::Span;
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use stageleft::{QuotedWithContext, RuntimeData};
23use syn::parse_quote;
24use tokio::sync::RwLock;
25
26use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
27use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
28use crate::NetworkHint;
29use crate::deploy_runtime::*;
30use crate::staging_util::get_this_crate;
31
32pub struct HydroDeploy {}
33
34impl<'a> Deploy<'a> for HydroDeploy {
35 type InstantiateEnv = Deployment;
36 type CompileEnv = ();
37 type Process = DeployNode;
38 type Cluster = DeployCluster;
39 type External = DeployExternal;
40 type Meta = HashMap<usize, Vec<u32>>;
41 type GraphId = ();
42 type Port = String;
43 type ExternalRawPort = CustomClientPort;
44
45 fn allocate_process_port(process: &Self::Process) -> Self::Port {
46 process.next_port()
47 }
48
49 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
50 cluster.next_port()
51 }
52
53 fn allocate_external_port(external: &Self::External) -> Self::Port {
54 external.next_port()
55 }
56
57 fn o2o_sink_source(
58 _env: &(),
59 _p1: &Self::Process,
60 p1_port: &Self::Port,
61 _p2: &Self::Process,
62 p2_port: &Self::Port,
63 ) -> (syn::Expr, syn::Expr) {
64 let p1_port = p1_port.as_str();
65 let p2_port = p2_port.as_str();
66 deploy_o2o(
67 RuntimeData::new("__hydro_lang_trybuild_cli"),
68 p1_port,
69 p2_port,
70 )
71 }
72
73 fn o2o_connect(
74 p1: &Self::Process,
75 p1_port: &Self::Port,
76 p2: &Self::Process,
77 p2_port: &Self::Port,
78 ) -> Box<dyn FnOnce()> {
79 let p1 = p1.clone();
80 let p1_port = p1_port.clone();
81 let p2 = p2.clone();
82 let p2_port = p2_port.clone();
83
84 Box::new(move || {
85 let self_underlying_borrow = p1.underlying.borrow();
86 let self_underlying = self_underlying_borrow.as_ref().unwrap();
87 let source_port = self_underlying
88 .try_read()
89 .unwrap()
90 .get_port(p1_port.clone(), self_underlying);
91
92 let other_underlying_borrow = p2.underlying.borrow();
93 let other_underlying = other_underlying_borrow.as_ref().unwrap();
94 let recipient_port = other_underlying
95 .try_read()
96 .unwrap()
97 .get_port(p2_port.clone(), other_underlying);
98
99 source_port.send_to(&recipient_port)
100 })
101 }
102
103 fn o2m_sink_source(
104 _env: &(),
105 _p1: &Self::Process,
106 p1_port: &Self::Port,
107 _c2: &Self::Cluster,
108 c2_port: &Self::Port,
109 ) -> (syn::Expr, syn::Expr) {
110 let p1_port = p1_port.as_str();
111 let c2_port = c2_port.as_str();
112 deploy_o2m(
113 RuntimeData::new("__hydro_lang_trybuild_cli"),
114 p1_port,
115 c2_port,
116 )
117 }
118
119 fn o2m_connect(
120 p1: &Self::Process,
121 p1_port: &Self::Port,
122 c2: &Self::Cluster,
123 c2_port: &Self::Port,
124 ) -> Box<dyn FnOnce()> {
125 let p1 = p1.clone();
126 let p1_port = p1_port.clone();
127 let c2 = c2.clone();
128 let c2_port = c2_port.clone();
129
130 Box::new(move || {
131 let self_underlying_borrow = p1.underlying.borrow();
132 let self_underlying = self_underlying_borrow.as_ref().unwrap();
133 let source_port = self_underlying
134 .try_read()
135 .unwrap()
136 .get_port(p1_port.clone(), self_underlying);
137
138 let recipient_port = DemuxSink {
139 demux: c2
140 .members
141 .borrow()
142 .iter()
143 .enumerate()
144 .map(|(id, c)| {
145 let n = c.underlying.try_read().unwrap();
146 (
147 id as u32,
148 Arc::new(n.get_port(c2_port.clone(), &c.underlying))
149 as Arc<dyn RustCrateSink + 'static>,
150 )
151 })
152 .collect(),
153 };
154
155 source_port.send_to(&recipient_port)
156 })
157 }
158
159 fn m2o_sink_source(
160 _env: &(),
161 _c1: &Self::Cluster,
162 c1_port: &Self::Port,
163 _p2: &Self::Process,
164 p2_port: &Self::Port,
165 ) -> (syn::Expr, syn::Expr) {
166 let c1_port = c1_port.as_str();
167 let p2_port = p2_port.as_str();
168 deploy_m2o(
169 RuntimeData::new("__hydro_lang_trybuild_cli"),
170 c1_port,
171 p2_port,
172 )
173 }
174
175 fn m2o_connect(
176 c1: &Self::Cluster,
177 c1_port: &Self::Port,
178 p2: &Self::Process,
179 p2_port: &Self::Port,
180 ) -> Box<dyn FnOnce()> {
181 let c1 = c1.clone();
182 let c1_port = c1_port.clone();
183 let p2 = p2.clone();
184 let p2_port = p2_port.clone();
185
186 Box::new(move || {
187 let other_underlying_borrow = p2.underlying.borrow();
188 let other_underlying = other_underlying_borrow.as_ref().unwrap();
189 let recipient_port = other_underlying
190 .try_read()
191 .unwrap()
192 .get_port(p2_port.clone(), other_underlying)
193 .merge();
194
195 for (i, node) in c1.members.borrow().iter().enumerate() {
196 let source_port = node
197 .underlying
198 .try_read()
199 .unwrap()
200 .get_port(c1_port.clone(), &node.underlying);
201
202 TaggedSource {
203 source: Arc::new(source_port),
204 tag: i as u32,
205 }
206 .send_to(&recipient_port);
207 }
208 })
209 }
210
211 fn m2m_sink_source(
212 _env: &(),
213 _c1: &Self::Cluster,
214 c1_port: &Self::Port,
215 _c2: &Self::Cluster,
216 c2_port: &Self::Port,
217 ) -> (syn::Expr, syn::Expr) {
218 let c1_port = c1_port.as_str();
219 let c2_port = c2_port.as_str();
220 deploy_m2m(
221 RuntimeData::new("__hydro_lang_trybuild_cli"),
222 c1_port,
223 c2_port,
224 )
225 }
226
227 fn m2m_connect(
228 c1: &Self::Cluster,
229 c1_port: &Self::Port,
230 c2: &Self::Cluster,
231 c2_port: &Self::Port,
232 ) -> Box<dyn FnOnce()> {
233 let c1 = c1.clone();
234 let c1_port = c1_port.clone();
235 let c2 = c2.clone();
236 let c2_port = c2_port.clone();
237
238 Box::new(move || {
239 for (i, sender) in c1.members.borrow().iter().enumerate() {
240 let source_port = sender
241 .underlying
242 .try_read()
243 .unwrap()
244 .get_port(c1_port.clone(), &sender.underlying);
245
246 let recipient_port = DemuxSink {
247 demux: c2
248 .members
249 .borrow()
250 .iter()
251 .enumerate()
252 .map(|(id, c)| {
253 let n = c.underlying.try_read().unwrap();
254 (
255 id as u32,
256 Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
257 as Arc<dyn RustCrateSink + 'static>,
258 )
259 })
260 .collect(),
261 };
262
263 TaggedSource {
264 source: Arc::new(source_port),
265 tag: i as u32,
266 }
267 .send_to(&recipient_port);
268 }
269 })
270 }
271
272 fn e2o_many_source(
273 _compile_env: &Self::CompileEnv,
274 extra_stmts: &mut Vec<syn::Stmt>,
275 _p2: &Self::Process,
276 p2_port: &Self::Port,
277 codec_type: &syn::Type,
278 shared_handle: String,
279 ) -> syn::Expr {
280 let connect_ident = syn::Ident::new(
281 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
282 Span::call_site(),
283 );
284 let source_ident = syn::Ident::new(
285 &format!("__hydro_deploy_many_{}_source", &shared_handle),
286 Span::call_site(),
287 );
288 let sink_ident = syn::Ident::new(
289 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
290 Span::call_site(),
291 );
292 let membership_ident = syn::Ident::new(
293 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
294 Span::call_site(),
295 );
296
297 let root = get_this_crate();
298
299 extra_stmts.push(syn::parse_quote! {
300 let #connect_ident = __hydro_lang_trybuild_cli
301 .port(#p2_port)
302 .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
303 });
304
305 extra_stmts.push(syn::parse_quote! {
306 let #source_ident = #connect_ident.source;
307 });
308
309 extra_stmts.push(syn::parse_quote! {
310 let #sink_ident = #connect_ident.sink;
311 });
312
313 extra_stmts.push(syn::parse_quote! {
314 let #membership_ident = #connect_ident.membership;
315 });
316
317 parse_quote!(#source_ident)
318 }
319
320 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
321 let sink_ident = syn::Ident::new(
322 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
323 Span::call_site(),
324 );
325 parse_quote!(#sink_ident)
326 }
327
328 fn e2o_source(
329 _compile_env: &Self::CompileEnv,
330 _p1: &Self::External,
331 p1_port: &Self::Port,
332 _p2: &Self::Process,
333 p2_port: &Self::Port,
334 ) -> syn::Expr {
335 let p1_port = p1_port.as_str();
336 let p2_port = p2_port.as_str();
337 deploy_e2o(
338 RuntimeData::new("__hydro_lang_trybuild_cli"),
339 p1_port,
340 p2_port,
341 )
342 }
343
344 fn e2o_connect(
345 p1: &Self::External,
346 p1_port: &Self::Port,
347 p2: &Self::Process,
348 p2_port: &Self::Port,
349 many: bool,
350 server_hint: NetworkHint,
351 ) -> Box<dyn FnOnce()> {
352 let p1 = p1.clone();
353 let p1_port = p1_port.clone();
354 let p2 = p2.clone();
355 let p2_port = p2_port.clone();
356
357 Box::new(move || {
358 let self_underlying_borrow = p1.underlying.borrow();
359 let self_underlying = self_underlying_borrow.as_ref().unwrap();
360 let source_port = if many {
361 self_underlying
362 .try_read()
363 .unwrap()
364 .declare_many_client(self_underlying)
365 } else {
366 self_underlying
367 .try_read()
368 .unwrap()
369 .declare_client(self_underlying)
370 };
371
372 let other_underlying_borrow = p2.underlying.borrow();
373 let other_underlying = other_underlying_borrow.as_ref().unwrap();
374 let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
375 p2_port.clone(),
376 match server_hint {
377 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
378 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
379 },
380 other_underlying,
381 );
382
383 source_port.send_to(&recipient_port);
384
385 p1.client_ports
386 .borrow_mut()
387 .insert(p1_port.clone(), source_port);
388 })
389 }
390
391 fn o2e_sink(
392 _compile_env: &Self::CompileEnv,
393 _p1: &Self::Process,
394 p1_port: &Self::Port,
395 _p2: &Self::External,
396 p2_port: &Self::Port,
397 ) -> syn::Expr {
398 let p1_port = p1_port.as_str();
399 let p2_port = p2_port.as_str();
400 deploy_o2e(
401 RuntimeData::new("__hydro_lang_trybuild_cli"),
402 p1_port,
403 p2_port,
404 )
405 }
406
407 fn o2e_connect(
408 p1: &Self::Process,
409 p1_port: &Self::Port,
410 p2: &Self::External,
411 p2_port: &Self::Port,
412 ) -> Box<dyn FnOnce()> {
413 let p1 = p1.clone();
414 let p1_port = p1_port.clone();
415 let p2 = p2.clone();
416 let p2_port = p2_port.clone();
417
418 Box::new(move || {
419 let self_underlying_borrow = p1.underlying.borrow();
420 let self_underlying = self_underlying_borrow.as_ref().unwrap();
421 let source_port = self_underlying
422 .try_read()
423 .unwrap()
424 .get_port(p1_port.clone(), self_underlying);
425
426 let other_underlying_borrow = p2.underlying.borrow();
427 let other_underlying = other_underlying_borrow.as_ref().unwrap();
428 let recipient_port = other_underlying
429 .try_read()
430 .unwrap()
431 .declare_client(other_underlying);
432
433 source_port.send_to(&recipient_port);
434
435 p2.client_ports
436 .borrow_mut()
437 .insert(p2_port.clone(), recipient_port);
438 })
439 }
440
441 fn cluster_ids(
442 _env: &Self::CompileEnv,
443 of_cluster: usize,
444 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
445 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
446 }
447
448 fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
449 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
450 }
451}
452
453pub trait DeployCrateWrapper {
454 fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
455
456 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
457 async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
458 self.underlying().read().await.stdout()
459 }
460
461 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
462 async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
463 self.underlying().read().await.stderr()
464 }
465
466 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
467 async fn stdout_filter(
468 &self,
469 prefix: impl Into<String>,
470 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
471 self.underlying().read().await.stdout_filter(prefix.into())
472 }
473
474 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
475 async fn stderr_filter(
476 &self,
477 prefix: impl Into<String>,
478 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
479 self.underlying().read().await.stderr_filter(prefix.into())
480 }
481
482 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
483 async fn tracing_results(&self) -> Option<TracingResults> {
484 self.underlying().read().await.tracing_results().cloned()
485 }
486}
487
488#[derive(Clone)]
489pub struct TrybuildHost {
490 pub host: Arc<dyn Host>,
491 pub display_name: Option<String>,
492 pub rustflags: Option<String>,
493 pub additional_hydro_features: Vec<String>,
494 pub features: Vec<String>,
495 pub tracing: Option<TracingOptions>,
496 pub name_hint: Option<String>,
497 pub cluster_idx: Option<usize>,
498}
499
500impl From<Arc<dyn Host>> for TrybuildHost {
501 fn from(host: Arc<dyn Host>) -> Self {
502 Self {
503 host,
504 display_name: None,
505 rustflags: None,
506 additional_hydro_features: vec![],
507 features: vec![],
508 tracing: None,
509 name_hint: None,
510 cluster_idx: None,
511 }
512 }
513}
514
515impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
516 fn from(host: Arc<H>) -> Self {
517 Self {
518 host,
519 display_name: None,
520 rustflags: None,
521 additional_hydro_features: vec![],
522 features: vec![],
523 tracing: None,
524 name_hint: None,
525 cluster_idx: None,
526 }
527 }
528}
529
530impl TrybuildHost {
531 pub fn new(host: Arc<dyn Host>) -> Self {
532 Self {
533 host,
534 display_name: None,
535 rustflags: None,
536 additional_hydro_features: vec![],
537 features: vec![],
538 tracing: None,
539 name_hint: None,
540 cluster_idx: None,
541 }
542 }
543
544 pub fn display_name(self, display_name: impl Into<String>) -> Self {
545 if self.display_name.is_some() {
546 panic!("{} already set", name_of!(display_name in Self));
547 }
548
549 Self {
550 display_name: Some(display_name.into()),
551 ..self
552 }
553 }
554
555 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
556 if self.rustflags.is_some() {
557 panic!("{} already set", name_of!(rustflags in Self));
558 }
559
560 Self {
561 rustflags: Some(rustflags.into()),
562 ..self
563 }
564 }
565
566 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
567 Self {
568 additional_hydro_features,
569 ..self
570 }
571 }
572
573 pub fn features(self, features: Vec<String>) -> Self {
574 Self {
575 features: self.features.into_iter().chain(features).collect(),
576 ..self
577 }
578 }
579
580 pub fn tracing(self, tracing: TracingOptions) -> Self {
581 if self.tracing.is_some() {
582 panic!("{} already set", name_of!(tracing in Self));
583 }
584
585 Self {
586 tracing: Some(tracing),
587 ..self
588 }
589 }
590}
591
592impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
593 type ProcessSpec = TrybuildHost;
594 fn into_process_spec(self) -> TrybuildHost {
595 TrybuildHost {
596 host: self,
597 display_name: None,
598 rustflags: None,
599 additional_hydro_features: vec![],
600 features: vec![],
601 tracing: None,
602 name_hint: None,
603 cluster_idx: None,
604 }
605 }
606}
607
608impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
609 type ProcessSpec = TrybuildHost;
610 fn into_process_spec(self) -> TrybuildHost {
611 TrybuildHost {
612 host: self,
613 display_name: None,
614 rustflags: None,
615 additional_hydro_features: vec![],
616 features: vec![],
617 tracing: None,
618 name_hint: None,
619 cluster_idx: None,
620 }
621 }
622}
623
624#[derive(Clone)]
625pub struct DeployExternal {
626 next_port: Rc<RefCell<usize>>,
627 host: Arc<dyn Host>,
628 underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
629 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
630 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
631}
632
633impl DeployExternal {
634 pub fn take_port(&self, key: usize) -> CustomClientPort {
635 self.client_ports
636 .borrow_mut()
637 .remove(self.allocated_ports.borrow().get(&key).unwrap())
638 .unwrap()
639 }
640}
641
642impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
643 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
644 self.allocated_ports.borrow_mut().insert(key, port);
645 }
646
647 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
648 self.client_ports
649 .borrow()
650 .get(self.allocated_ports.borrow().get(&key).unwrap())
651 .unwrap()
652 .clone()
653 }
654
655 fn as_bytes_bidi(
656 &self,
657 key: usize,
658 ) -> impl Future<
659 Output = (
660 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
661 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
662 ),
663 > + 'a {
664 let port = self.raw_port(key);
665
666 async move {
667 let (source, sink) = port.connect().await.into_source_sink();
668 (
669 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671 )
672 }
673 }
674
675 fn as_bincode_bidi<InT, OutT>(
676 &self,
677 key: usize,
678 ) -> impl Future<
679 Output = (
680 Pin<Box<dyn Stream<Item = OutT>>>,
681 Pin<Box<dyn Sink<InT, Error = Error>>>,
682 ),
683 > + 'a
684 where
685 InT: Serialize + 'static,
686 OutT: DeserializeOwned + 'static,
687 {
688 let port = self.raw_port(key);
689 async move {
690 let (source, sink) = port.connect().await.into_source_sink();
691 (
692 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
693 as Pin<Box<dyn Stream<Item = OutT>>>,
694 Box::pin(
695 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
696 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
697 )
698 }
699 }
700
701 fn as_bincode_sink<T: Serialize + 'static>(
702 &self,
703 key: usize,
704 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
705 let port = self.raw_port(key);
706 async move {
707 let sink = port.connect().await.into_sink();
708 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
709 as Pin<Box<dyn Sink<T, Error = Error>>>
710 }
711 }
712
713 fn as_bincode_source<T: DeserializeOwned + 'static>(
714 &self,
715 key: usize,
716 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
717 let port = self.raw_port(key);
718 async move {
719 let source = port.connect().await.into_source();
720 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
721 as Pin<Box<dyn Stream<Item = T>>>
722 }
723 }
724}
725
726impl Node for DeployExternal {
727 type Port = String;
728 type Meta = HashMap<usize, Vec<u32>>;
729 type InstantiateEnv = Deployment;
730
731 fn next_port(&self) -> Self::Port {
732 let next_port = *self.next_port.borrow();
733 *self.next_port.borrow_mut() += 1;
734
735 format!("port_{}", next_port)
736 }
737
738 fn instantiate(
739 &self,
740 env: &mut Self::InstantiateEnv,
741 _meta: &mut Self::Meta,
742 _graph: DfirGraph,
743 _extra_stmts: Vec<syn::Stmt>,
744 ) {
745 let service = env.CustomService(self.host.clone(), vec![]);
746 *self.underlying.borrow_mut() = Some(service);
747 }
748
749 fn update_meta(&mut self, _meta: &Self::Meta) {}
750}
751
752impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
753 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
754 DeployExternal {
755 next_port: Rc::new(RefCell::new(0)),
756 host: self,
757 underlying: Rc::new(RefCell::new(None)),
758 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
759 client_ports: Rc::new(RefCell::new(HashMap::new())),
760 }
761 }
762}
763
764impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
765 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
766 DeployExternal {
767 next_port: Rc::new(RefCell::new(0)),
768 host: self,
769 underlying: Rc::new(RefCell::new(None)),
770 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
771 client_ports: Rc::new(RefCell::new(HashMap::new())),
772 }
773 }
774}
775
776pub enum CrateOrTrybuild {
777 Crate(RustCrate),
778 Trybuild(TrybuildHost),
779}
780
781#[derive(Clone)]
782pub struct DeployNode {
783 id: usize,
784 next_port: Rc<RefCell<usize>>,
785 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
786 underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
787}
788
789impl DeployCrateWrapper for DeployNode {
790 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
791 self.underlying.borrow().as_ref().unwrap().clone()
792 }
793}
794
795impl Node for DeployNode {
796 type Port = String;
797 type Meta = HashMap<usize, Vec<u32>>;
798 type InstantiateEnv = Deployment;
799
800 fn next_port(&self) -> String {
801 let next_port = *self.next_port.borrow();
802 *self.next_port.borrow_mut() += 1;
803
804 format!("port_{}", next_port)
805 }
806
807 fn update_meta(&mut self, meta: &Self::Meta) {
808 let underlying_node = self.underlying.borrow();
809 let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
810 n.update_meta(HydroMeta {
811 clusters: meta.clone(),
812 cluster_id: None,
813 subgraph_id: self.id,
814 });
815 }
816
817 fn instantiate(
818 &self,
819 env: &mut Self::InstantiateEnv,
820 _meta: &mut Self::Meta,
821 graph: DfirGraph,
822 extra_stmts: Vec<syn::Stmt>,
823 ) {
824 let service = match self.service_spec.borrow_mut().take().unwrap() {
825 CrateOrTrybuild::Crate(c) => c,
826 CrateOrTrybuild::Trybuild(trybuild) => {
827 let (bin_name, config) =
828 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
829 create_trybuild_service(
830 trybuild,
831 &config.project_dir,
832 &config.target_dir,
833 &config.features,
834 &bin_name,
835 )
836 }
837 };
838
839 *self.underlying.borrow_mut() = Some(env.add_service(service));
840 }
841}
842
843#[derive(Clone)]
844pub struct DeployClusterNode {
845 underlying: Arc<RwLock<RustCrateService>>,
846}
847
848impl DeployCrateWrapper for DeployClusterNode {
849 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
850 self.underlying.clone()
851 }
852}
853
854#[derive(Clone)]
855pub struct DeployCluster {
856 id: usize,
857 next_port: Rc<RefCell<usize>>,
858 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
859 members: Rc<RefCell<Vec<DeployClusterNode>>>,
860 name_hint: Option<String>,
861}
862
863impl DeployCluster {
864 pub fn members(&self) -> Vec<DeployClusterNode> {
865 self.members.borrow().clone()
866 }
867}
868
869impl Node for DeployCluster {
870 type Port = String;
871 type Meta = HashMap<usize, Vec<u32>>;
872 type InstantiateEnv = Deployment;
873
874 fn next_port(&self) -> String {
875 let next_port = *self.next_port.borrow();
876 *self.next_port.borrow_mut() += 1;
877
878 format!("port_{}", next_port)
879 }
880
881 fn instantiate(
882 &self,
883 env: &mut Self::InstantiateEnv,
884 meta: &mut Self::Meta,
885 graph: DfirGraph,
886 extra_stmts: Vec<syn::Stmt>,
887 ) {
888 let has_trybuild = self
889 .cluster_spec
890 .borrow()
891 .as_ref()
892 .unwrap()
893 .iter()
894 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
895
896 let maybe_trybuild = if has_trybuild {
897 Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
898 } else {
899 None
900 };
901
902 let cluster_nodes = self
903 .cluster_spec
904 .borrow_mut()
905 .take()
906 .unwrap()
907 .into_iter()
908 .map(|spec| {
909 let service = match spec {
910 CrateOrTrybuild::Crate(c) => c,
911 CrateOrTrybuild::Trybuild(trybuild) => {
912 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
913 create_trybuild_service(
914 trybuild,
915 &config.project_dir,
916 &config.target_dir,
917 &config.features,
918 bin_name,
919 )
920 }
921 };
922
923 env.add_service(service)
924 })
925 .collect::<Vec<_>>();
926 meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
927 *self.members.borrow_mut() = cluster_nodes
928 .into_iter()
929 .map(|n| DeployClusterNode { underlying: n })
930 .collect();
931 }
932
933 fn update_meta(&mut self, meta: &Self::Meta) {
934 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
935 let mut n = node.underlying.try_write().unwrap();
936 n.update_meta(HydroMeta {
937 clusters: meta.clone(),
938 cluster_id: Some(cluster_id as u32),
939 subgraph_id: self.id,
940 });
941 }
942 }
943}
944
945#[derive(Clone)]
946pub struct DeployProcessSpec(RustCrate);
947
948impl DeployProcessSpec {
949 pub fn new(t: RustCrate) -> Self {
950 Self(t)
951 }
952}
953
954impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
955 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
956 DeployNode {
957 id,
958 next_port: Rc::new(RefCell::new(0)),
959 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
960 underlying: Rc::new(RefCell::new(None)),
961 }
962 }
963}
964
965impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
966 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
967 self.name_hint = Some(format!("{} (process {id})", name_hint));
968 DeployNode {
969 id,
970 next_port: Rc::new(RefCell::new(0)),
971 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
972 underlying: Rc::new(RefCell::new(None)),
973 }
974 }
975}
976
977#[derive(Clone)]
978pub struct DeployClusterSpec(Vec<RustCrate>);
979
980impl DeployClusterSpec {
981 pub fn new(crates: Vec<RustCrate>) -> Self {
982 Self(crates)
983 }
984}
985
986impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
987 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
988 DeployCluster {
989 id,
990 next_port: Rc::new(RefCell::new(0)),
991 cluster_spec: Rc::new(RefCell::new(Some(
992 self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
993 ))),
994 members: Rc::new(RefCell::new(vec![])),
995 name_hint: None,
996 }
997 }
998}
999
1000impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1001 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1002 let name_hint = format!("{} (cluster {id})", name_hint);
1003 DeployCluster {
1004 id,
1005 next_port: Rc::new(RefCell::new(0)),
1006 cluster_spec: Rc::new(RefCell::new(Some(
1007 self.into_iter()
1008 .enumerate()
1009 .map(|(idx, b)| {
1010 let mut b = b.into();
1011 b.name_hint = Some(name_hint.clone());
1012 b.cluster_idx = Some(idx);
1013 CrateOrTrybuild::Trybuild(b)
1014 })
1015 .collect(),
1016 ))),
1017 members: Rc::new(RefCell::new(vec![])),
1018 name_hint: Some(name_hint),
1019 }
1020 }
1021}
1022
1023fn create_trybuild_service(
1024 trybuild: TrybuildHost,
1025 dir: &std::path::PathBuf,
1026 target_dir: &std::path::PathBuf,
1027 features: &Option<Vec<String>>,
1028 bin_name: &str,
1029) -> RustCrate {
1030 let mut ret = RustCrate::new(dir, trybuild.host)
1031 .target_dir(target_dir)
1032 .bin(bin_name)
1033 .no_default_features();
1034
1035 if let Some(display_name) = trybuild.display_name {
1036 ret = ret.display_name(display_name);
1037 } else if let Some(name_hint) = trybuild.name_hint {
1038 if let Some(cluster_idx) = trybuild.cluster_idx {
1039 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1040 } else {
1041 ret = ret.display_name(name_hint);
1042 }
1043 }
1044
1045 if let Some(rustflags) = trybuild.rustflags {
1046 ret = ret.rustflags(rustflags);
1047 }
1048
1049 if let Some(tracing) = trybuild.tracing {
1050 ret = ret.tracing(tracing);
1051 }
1052
1053 ret = ret.features(
1054 trybuild
1055 .additional_hydro_features
1056 .into_iter()
1057 .map(|runtime_feature| {
1058 assert!(
1059 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1060 "{runtime_feature} is not a valid Hydro runtime feature"
1061 );
1062 format!("hydro___feature_{runtime_feature}")
1063 })
1064 .chain(trybuild.features),
1065 );
1066
1067 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1068 ret = ret.config("build.incremental = false");
1069
1070 if let Some(features) = features {
1071 ret = ret.features(features);
1072 }
1073
1074 ret
1075}