1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use crate::compile::deploy_provider::{
30 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
33use crate::location::NetworkHint;
34use crate::staging_util::get_this_crate;
35
36pub enum HydroDeploy {}
41
42impl<'a> Deploy<'a> for HydroDeploy {
43 type InstantiateEnv = Deployment;
44 type CompileEnv = ();
45 type Process = DeployNode;
46 type Cluster = DeployCluster;
47 type External = DeployExternal;
48 type Meta = HashMap<usize, Vec<u32>>;
49 type GraphId = ();
50 type Port = String;
51 type ExternalRawPort = CustomClientPort;
52
53 fn allocate_process_port(process: &Self::Process) -> Self::Port {
54 process.next_port()
55 }
56
57 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58 cluster.next_port()
59 }
60
61 fn allocate_external_port(external: &Self::External) -> Self::Port {
62 external.next_port()
63 }
64
65 fn o2o_sink_source(
66 _env: &(),
67 _p1: &Self::Process,
68 p1_port: &Self::Port,
69 _p2: &Self::Process,
70 p2_port: &Self::Port,
71 ) -> (syn::Expr, syn::Expr) {
72 let p1_port = p1_port.as_str();
73 let p2_port = p2_port.as_str();
74 deploy_o2o(
75 RuntimeData::new("__hydro_lang_trybuild_cli"),
76 p1_port,
77 p2_port,
78 )
79 }
80
81 fn o2o_connect(
82 p1: &Self::Process,
83 p1_port: &Self::Port,
84 p2: &Self::Process,
85 p2_port: &Self::Port,
86 ) -> Box<dyn FnOnce()> {
87 let p1 = p1.clone();
88 let p1_port = p1_port.clone();
89 let p2 = p2.clone();
90 let p2_port = p2_port.clone();
91
92 Box::new(move || {
93 let self_underlying_borrow = p1.underlying.borrow();
94 let self_underlying = self_underlying_borrow.as_ref().unwrap();
95 let source_port = self_underlying
96 .try_read()
97 .unwrap()
98 .get_port(p1_port.clone(), self_underlying);
99
100 let other_underlying_borrow = p2.underlying.borrow();
101 let other_underlying = other_underlying_borrow.as_ref().unwrap();
102 let recipient_port = other_underlying
103 .try_read()
104 .unwrap()
105 .get_port(p2_port.clone(), other_underlying);
106
107 source_port.send_to(&recipient_port)
108 })
109 }
110
111 fn o2m_sink_source(
112 _env: &(),
113 _p1: &Self::Process,
114 p1_port: &Self::Port,
115 _c2: &Self::Cluster,
116 c2_port: &Self::Port,
117 ) -> (syn::Expr, syn::Expr) {
118 let p1_port = p1_port.as_str();
119 let c2_port = c2_port.as_str();
120 deploy_o2m(
121 RuntimeData::new("__hydro_lang_trybuild_cli"),
122 p1_port,
123 c2_port,
124 )
125 }
126
127 fn o2m_connect(
128 p1: &Self::Process,
129 p1_port: &Self::Port,
130 c2: &Self::Cluster,
131 c2_port: &Self::Port,
132 ) -> Box<dyn FnOnce()> {
133 let p1 = p1.clone();
134 let p1_port = p1_port.clone();
135 let c2 = c2.clone();
136 let c2_port = c2_port.clone();
137
138 Box::new(move || {
139 let self_underlying_borrow = p1.underlying.borrow();
140 let self_underlying = self_underlying_borrow.as_ref().unwrap();
141 let source_port = self_underlying
142 .try_read()
143 .unwrap()
144 .get_port(p1_port.clone(), self_underlying);
145
146 let recipient_port = DemuxSink {
147 demux: c2
148 .members
149 .borrow()
150 .iter()
151 .enumerate()
152 .map(|(id, c)| {
153 let n = c.underlying.try_read().unwrap();
154 (
155 id as u32,
156 Arc::new(n.get_port(c2_port.clone(), &c.underlying))
157 as Arc<dyn RustCrateSink + 'static>,
158 )
159 })
160 .collect(),
161 };
162
163 source_port.send_to(&recipient_port)
164 })
165 }
166
167 fn m2o_sink_source(
168 _env: &(),
169 _c1: &Self::Cluster,
170 c1_port: &Self::Port,
171 _p2: &Self::Process,
172 p2_port: &Self::Port,
173 ) -> (syn::Expr, syn::Expr) {
174 let c1_port = c1_port.as_str();
175 let p2_port = p2_port.as_str();
176 deploy_m2o(
177 RuntimeData::new("__hydro_lang_trybuild_cli"),
178 c1_port,
179 p2_port,
180 )
181 }
182
183 fn m2o_connect(
184 c1: &Self::Cluster,
185 c1_port: &Self::Port,
186 p2: &Self::Process,
187 p2_port: &Self::Port,
188 ) -> Box<dyn FnOnce()> {
189 let c1 = c1.clone();
190 let c1_port = c1_port.clone();
191 let p2 = p2.clone();
192 let p2_port = p2_port.clone();
193
194 Box::new(move || {
195 let other_underlying_borrow = p2.underlying.borrow();
196 let other_underlying = other_underlying_borrow.as_ref().unwrap();
197 let recipient_port = other_underlying
198 .try_read()
199 .unwrap()
200 .get_port(p2_port.clone(), other_underlying)
201 .merge();
202
203 for (i, node) in c1.members.borrow().iter().enumerate() {
204 let source_port = node
205 .underlying
206 .try_read()
207 .unwrap()
208 .get_port(c1_port.clone(), &node.underlying);
209
210 TaggedSource {
211 source: Arc::new(source_port),
212 tag: i as u32,
213 }
214 .send_to(&recipient_port);
215 }
216 })
217 }
218
219 fn m2m_sink_source(
220 _env: &(),
221 _c1: &Self::Cluster,
222 c1_port: &Self::Port,
223 _c2: &Self::Cluster,
224 c2_port: &Self::Port,
225 ) -> (syn::Expr, syn::Expr) {
226 let c1_port = c1_port.as_str();
227 let c2_port = c2_port.as_str();
228 deploy_m2m(
229 RuntimeData::new("__hydro_lang_trybuild_cli"),
230 c1_port,
231 c2_port,
232 )
233 }
234
235 fn m2m_connect(
236 c1: &Self::Cluster,
237 c1_port: &Self::Port,
238 c2: &Self::Cluster,
239 c2_port: &Self::Port,
240 ) -> Box<dyn FnOnce()> {
241 let c1 = c1.clone();
242 let c1_port = c1_port.clone();
243 let c2 = c2.clone();
244 let c2_port = c2_port.clone();
245
246 Box::new(move || {
247 for (i, sender) in c1.members.borrow().iter().enumerate() {
248 let source_port = sender
249 .underlying
250 .try_read()
251 .unwrap()
252 .get_port(c1_port.clone(), &sender.underlying);
253
254 let recipient_port = DemuxSink {
255 demux: c2
256 .members
257 .borrow()
258 .iter()
259 .enumerate()
260 .map(|(id, c)| {
261 let n = c.underlying.try_read().unwrap();
262 (
263 id as u32,
264 Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
265 as Arc<dyn RustCrateSink + 'static>,
266 )
267 })
268 .collect(),
269 };
270
271 TaggedSource {
272 source: Arc::new(source_port),
273 tag: i as u32,
274 }
275 .send_to(&recipient_port);
276 }
277 })
278 }
279
280 fn e2o_many_source(
281 _compile_env: &Self::CompileEnv,
282 extra_stmts: &mut Vec<syn::Stmt>,
283 _p2: &Self::Process,
284 p2_port: &Self::Port,
285 codec_type: &syn::Type,
286 shared_handle: String,
287 ) -> syn::Expr {
288 let connect_ident = syn::Ident::new(
289 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
290 Span::call_site(),
291 );
292 let source_ident = syn::Ident::new(
293 &format!("__hydro_deploy_many_{}_source", &shared_handle),
294 Span::call_site(),
295 );
296 let sink_ident = syn::Ident::new(
297 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
298 Span::call_site(),
299 );
300 let membership_ident = syn::Ident::new(
301 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
302 Span::call_site(),
303 );
304
305 let root = get_this_crate();
306
307 extra_stmts.push(syn::parse_quote! {
308 let #connect_ident = __hydro_lang_trybuild_cli
309 .port(#p2_port)
310 .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
311 });
312
313 extra_stmts.push(syn::parse_quote! {
314 let #source_ident = #connect_ident.source;
315 });
316
317 extra_stmts.push(syn::parse_quote! {
318 let #sink_ident = #connect_ident.sink;
319 });
320
321 extra_stmts.push(syn::parse_quote! {
322 let #membership_ident = #connect_ident.membership;
323 });
324
325 parse_quote!(#source_ident)
326 }
327
328 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
329 let sink_ident = syn::Ident::new(
330 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
331 Span::call_site(),
332 );
333 parse_quote!(#sink_ident)
334 }
335
336 fn e2o_source(
337 _compile_env: &Self::CompileEnv,
338 extra_stmts: &mut Vec<syn::Stmt>,
339 _p1: &Self::External,
340 _p1_port: &Self::Port,
341 _p2: &Self::Process,
342 p2_port: &Self::Port,
343 codec_type: &syn::Type,
344 shared_handle: String,
345 ) -> syn::Expr {
346 let connect_ident = syn::Ident::new(
347 &format!("__hydro_deploy_{}_connect", &shared_handle),
348 Span::call_site(),
349 );
350 let source_ident = syn::Ident::new(
351 &format!("__hydro_deploy_{}_source", &shared_handle),
352 Span::call_site(),
353 );
354 let sink_ident = syn::Ident::new(
355 &format!("__hydro_deploy_{}_sink", &shared_handle),
356 Span::call_site(),
357 );
358
359 let root = get_this_crate();
360
361 extra_stmts.push(syn::parse_quote! {
362 let #connect_ident = __hydro_lang_trybuild_cli
363 .port(#p2_port)
364 .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
365 });
366
367 extra_stmts.push(syn::parse_quote! {
368 let #source_ident = #connect_ident.source;
369 });
370
371 extra_stmts.push(syn::parse_quote! {
372 let #sink_ident = #connect_ident.sink;
373 });
374
375 parse_quote!(#source_ident)
376 }
377
378 fn e2o_connect(
379 p1: &Self::External,
380 p1_port: &Self::Port,
381 p2: &Self::Process,
382 p2_port: &Self::Port,
383 _many: bool,
384 server_hint: NetworkHint,
385 ) -> Box<dyn FnOnce()> {
386 let p1 = p1.clone();
387 let p1_port = p1_port.clone();
388 let p2 = p2.clone();
389 let p2_port = p2_port.clone();
390
391 Box::new(move || {
392 let self_underlying_borrow = p1.underlying.borrow();
393 let self_underlying = self_underlying_borrow.as_ref().unwrap();
394 let source_port = self_underlying
395 .try_read()
396 .unwrap()
397 .declare_many_client(self_underlying);
398
399 let other_underlying_borrow = p2.underlying.borrow();
400 let other_underlying = other_underlying_borrow.as_ref().unwrap();
401 let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
402 p2_port.clone(),
403 match server_hint {
404 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
405 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
406 },
407 other_underlying,
408 );
409
410 source_port.send_to(&recipient_port);
411
412 p1.client_ports
413 .borrow_mut()
414 .insert(p1_port.clone(), source_port);
415 })
416 }
417
418 fn o2e_sink(
419 _compile_env: &Self::CompileEnv,
420 _p1: &Self::Process,
421 _p1_port: &Self::Port,
422 _p2: &Self::External,
423 _p2_port: &Self::Port,
424 shared_handle: String,
425 ) -> syn::Expr {
426 let sink_ident = syn::Ident::new(
427 &format!("__hydro_deploy_{}_sink", &shared_handle),
428 Span::call_site(),
429 );
430 parse_quote!(#sink_ident)
431 }
432
433 fn cluster_ids(
434 _env: &Self::CompileEnv,
435 of_cluster: usize,
436 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
437 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
438 }
439
440 fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
441 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
442 }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446pub trait DeployCrateWrapper {
447 fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
448
449 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
450 async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451 self.underlying().read().await.stdout()
452 }
453
454 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
455 async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
456 self.underlying().read().await.stderr()
457 }
458
459 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
460 async fn stdout_filter(
461 &self,
462 prefix: impl Into<String>,
463 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
464 self.underlying().read().await.stdout_filter(prefix.into())
465 }
466
467 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
468 async fn stderr_filter(
469 &self,
470 prefix: impl Into<String>,
471 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472 self.underlying().read().await.stderr_filter(prefix.into())
473 }
474
475 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476 async fn tracing_results(&self) -> Option<TracingResults> {
477 self.underlying().read().await.tracing_results().cloned()
478 }
479}
480
481#[expect(missing_docs, reason = "TODO")]
482#[derive(Clone)]
483pub struct TrybuildHost {
484 host: Arc<dyn Host>,
485 display_name: Option<String>,
486 rustflags: Option<String>,
487 additional_hydro_features: Vec<String>,
488 features: Vec<String>,
489 tracing: Option<TracingOptions>,
490 build_envs: Vec<(String, String)>,
491 name_hint: Option<String>,
492 cluster_idx: Option<usize>,
493}
494
495impl From<Arc<dyn Host>> for TrybuildHost {
496 fn from(host: Arc<dyn Host>) -> Self {
497 Self {
498 host,
499 display_name: None,
500 rustflags: None,
501 additional_hydro_features: vec![],
502 features: vec![],
503 tracing: None,
504 build_envs: vec![],
505 name_hint: None,
506 cluster_idx: None,
507 }
508 }
509}
510
511impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
512 fn from(host: Arc<H>) -> Self {
513 Self {
514 host,
515 display_name: None,
516 rustflags: None,
517 additional_hydro_features: vec![],
518 features: vec![],
519 tracing: None,
520 build_envs: vec![],
521 name_hint: None,
522 cluster_idx: None,
523 }
524 }
525}
526
527#[expect(missing_docs, reason = "TODO")]
528impl TrybuildHost {
529 pub fn new(host: Arc<dyn Host>) -> Self {
530 Self {
531 host,
532 display_name: None,
533 rustflags: None,
534 additional_hydro_features: vec![],
535 features: vec![],
536 tracing: None,
537 build_envs: vec![],
538 name_hint: None,
539 cluster_idx: None,
540 }
541 }
542
543 pub fn display_name(self, display_name: impl Into<String>) -> Self {
544 if self.display_name.is_some() {
545 panic!("{} already set", name_of!(display_name in Self));
546 }
547
548 Self {
549 display_name: Some(display_name.into()),
550 ..self
551 }
552 }
553
554 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
555 if self.rustflags.is_some() {
556 panic!("{} already set", name_of!(rustflags in Self));
557 }
558
559 Self {
560 rustflags: Some(rustflags.into()),
561 ..self
562 }
563 }
564
565 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
566 Self {
567 additional_hydro_features,
568 ..self
569 }
570 }
571
572 pub fn features(self, features: Vec<String>) -> Self {
573 Self {
574 features: self.features.into_iter().chain(features).collect(),
575 ..self
576 }
577 }
578
579 pub fn tracing(self, tracing: TracingOptions) -> Self {
580 if self.tracing.is_some() {
581 panic!("{} already set", name_of!(tracing in Self));
582 }
583
584 Self {
585 tracing: Some(tracing),
586 ..self
587 }
588 }
589
590 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
591 Self {
592 build_envs: self
593 .build_envs
594 .into_iter()
595 .chain(std::iter::once((key.into(), value.into())))
596 .collect(),
597 ..self
598 }
599 }
600}
601
602impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
603 type ProcessSpec = TrybuildHost;
604 fn into_process_spec(self) -> TrybuildHost {
605 TrybuildHost {
606 host: self,
607 display_name: None,
608 rustflags: None,
609 additional_hydro_features: vec![],
610 features: vec![],
611 tracing: None,
612 build_envs: vec![],
613 name_hint: None,
614 cluster_idx: None,
615 }
616 }
617}
618
619impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
620 type ProcessSpec = TrybuildHost;
621 fn into_process_spec(self) -> TrybuildHost {
622 TrybuildHost {
623 host: self,
624 display_name: None,
625 rustflags: None,
626 additional_hydro_features: vec![],
627 features: vec![],
628 tracing: None,
629 build_envs: vec![],
630 name_hint: None,
631 cluster_idx: None,
632 }
633 }
634}
635
636#[expect(missing_docs, reason = "TODO")]
637#[derive(Clone)]
638pub struct DeployExternal {
639 next_port: Rc<RefCell<usize>>,
640 host: Arc<dyn Host>,
641 underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
642 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
643 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
644}
645
646impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
647 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
648 assert!(
649 self.allocated_ports
650 .borrow_mut()
651 .insert(key, port.clone())
652 .is_none_or(|old| old == port)
653 );
654 }
655
656 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
657 self.client_ports
658 .borrow()
659 .get(self.allocated_ports.borrow().get(&key).unwrap())
660 .unwrap()
661 .clone()
662 }
663
664 fn as_bytes_bidi(
665 &self,
666 key: usize,
667 ) -> impl Future<
668 Output = (
669 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671 ),
672 > + 'a {
673 let port = self.raw_port(key);
674
675 async move {
676 let (source, sink) = port.connect().await.into_source_sink();
677 (
678 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
679 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
680 )
681 }
682 }
683
684 fn as_bincode_bidi<InT, OutT>(
685 &self,
686 key: usize,
687 ) -> impl Future<
688 Output = (
689 Pin<Box<dyn Stream<Item = OutT>>>,
690 Pin<Box<dyn Sink<InT, Error = Error>>>,
691 ),
692 > + 'a
693 where
694 InT: Serialize + 'static,
695 OutT: DeserializeOwned + 'static,
696 {
697 let port = self.raw_port(key);
698 async move {
699 let (source, sink) = port.connect().await.into_source_sink();
700 (
701 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
702 as Pin<Box<dyn Stream<Item = OutT>>>,
703 Box::pin(
704 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
705 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
706 )
707 }
708 }
709
710 fn as_bincode_sink<T: Serialize + 'static>(
711 &self,
712 key: usize,
713 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
714 let port = self.raw_port(key);
715 async move {
716 let sink = port.connect().await.into_sink();
717 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
718 as Pin<Box<dyn Sink<T, Error = Error>>>
719 }
720 }
721
722 fn as_bincode_source<T: DeserializeOwned + 'static>(
723 &self,
724 key: usize,
725 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
726 let port = self.raw_port(key);
727 async move {
728 let source = port.connect().await.into_source();
729 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
730 as Pin<Box<dyn Stream<Item = T>>>
731 }
732 }
733}
734
735impl Node for DeployExternal {
736 type Port = String;
737 type Meta = HashMap<usize, Vec<u32>>;
738 type InstantiateEnv = Deployment;
739
740 fn next_port(&self) -> Self::Port {
741 let next_port = *self.next_port.borrow();
742 *self.next_port.borrow_mut() += 1;
743
744 format!("port_{}", next_port)
745 }
746
747 fn instantiate(
748 &self,
749 env: &mut Self::InstantiateEnv,
750 _meta: &mut Self::Meta,
751 _graph: DfirGraph,
752 _extra_stmts: Vec<syn::Stmt>,
753 ) {
754 let service = env.CustomService(self.host.clone(), vec![]);
755 *self.underlying.borrow_mut() = Some(service);
756 }
757
758 fn update_meta(&mut self, _meta: &Self::Meta) {}
759}
760
761impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
762 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
763 DeployExternal {
764 next_port: Rc::new(RefCell::new(0)),
765 host: self,
766 underlying: Rc::new(RefCell::new(None)),
767 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
768 client_ports: Rc::new(RefCell::new(HashMap::new())),
769 }
770 }
771}
772
773impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
774 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
775 DeployExternal {
776 next_port: Rc::new(RefCell::new(0)),
777 host: self,
778 underlying: Rc::new(RefCell::new(None)),
779 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
780 client_ports: Rc::new(RefCell::new(HashMap::new())),
781 }
782 }
783}
784
785pub(crate) enum CrateOrTrybuild {
786 Crate(RustCrate),
787 Trybuild(TrybuildHost),
788}
789
790#[expect(missing_docs, reason = "TODO")]
791#[derive(Clone)]
792pub struct DeployNode {
793 id: usize,
794 next_port: Rc<RefCell<usize>>,
795 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
796 underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
797}
798
799impl DeployCrateWrapper for DeployNode {
800 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
801 self.underlying.borrow().as_ref().unwrap().clone()
802 }
803}
804
805impl Node for DeployNode {
806 type Port = String;
807 type Meta = HashMap<usize, Vec<u32>>;
808 type InstantiateEnv = Deployment;
809
810 fn next_port(&self) -> String {
811 let next_port = *self.next_port.borrow();
812 *self.next_port.borrow_mut() += 1;
813
814 format!("port_{}", next_port)
815 }
816
817 fn update_meta(&mut self, meta: &Self::Meta) {
818 let underlying_node = self.underlying.borrow();
819 let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
820 n.update_meta(HydroMeta {
821 clusters: meta.clone(),
822 cluster_id: None,
823 subgraph_id: self.id,
824 });
825 }
826
827 fn instantiate(
828 &self,
829 env: &mut Self::InstantiateEnv,
830 _meta: &mut Self::Meta,
831 graph: DfirGraph,
832 extra_stmts: Vec<syn::Stmt>,
833 ) {
834 let service = match self.service_spec.borrow_mut().take().unwrap() {
835 CrateOrTrybuild::Crate(c) => c,
836 CrateOrTrybuild::Trybuild(trybuild) => {
837 let (bin_name, config) =
838 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
839 create_trybuild_service(
840 trybuild,
841 &config.project_dir,
842 &config.target_dir,
843 &config.features,
844 &bin_name,
845 )
846 }
847 };
848
849 *self.underlying.borrow_mut() = Some(env.add_service(service));
850 }
851}
852
853#[expect(missing_docs, reason = "TODO")]
854#[derive(Clone)]
855pub struct DeployClusterNode {
856 underlying: Arc<RwLock<RustCrateService>>,
857}
858
859impl DeployCrateWrapper for DeployClusterNode {
860 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
861 self.underlying.clone()
862 }
863}
864#[expect(missing_docs, reason = "TODO")]
865#[derive(Clone)]
866pub struct DeployCluster {
867 id: usize,
868 next_port: Rc<RefCell<usize>>,
869 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
870 members: Rc<RefCell<Vec<DeployClusterNode>>>,
871 name_hint: Option<String>,
872}
873
874impl DeployCluster {
875 #[expect(missing_docs, reason = "TODO")]
876 pub fn members(&self) -> Vec<DeployClusterNode> {
877 self.members.borrow().clone()
878 }
879}
880
881impl Node for DeployCluster {
882 type Port = String;
883 type Meta = HashMap<usize, Vec<u32>>;
884 type InstantiateEnv = Deployment;
885
886 fn next_port(&self) -> String {
887 let next_port = *self.next_port.borrow();
888 *self.next_port.borrow_mut() += 1;
889
890 format!("port_{}", next_port)
891 }
892
893 fn instantiate(
894 &self,
895 env: &mut Self::InstantiateEnv,
896 meta: &mut Self::Meta,
897 graph: DfirGraph,
898 extra_stmts: Vec<syn::Stmt>,
899 ) {
900 let has_trybuild = self
901 .cluster_spec
902 .borrow()
903 .as_ref()
904 .unwrap()
905 .iter()
906 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
907
908 let maybe_trybuild = if has_trybuild {
909 Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
910 } else {
911 None
912 };
913
914 let cluster_nodes = self
915 .cluster_spec
916 .borrow_mut()
917 .take()
918 .unwrap()
919 .into_iter()
920 .map(|spec| {
921 let service = match spec {
922 CrateOrTrybuild::Crate(c) => c,
923 CrateOrTrybuild::Trybuild(trybuild) => {
924 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
925 create_trybuild_service(
926 trybuild,
927 &config.project_dir,
928 &config.target_dir,
929 &config.features,
930 bin_name,
931 )
932 }
933 };
934
935 env.add_service(service)
936 })
937 .collect::<Vec<_>>();
938 meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
939 *self.members.borrow_mut() = cluster_nodes
940 .into_iter()
941 .map(|n| DeployClusterNode { underlying: n })
942 .collect();
943 }
944
945 fn update_meta(&mut self, meta: &Self::Meta) {
946 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
947 let mut n = node.underlying.try_write().unwrap();
948 n.update_meta(HydroMeta {
949 clusters: meta.clone(),
950 cluster_id: Some(cluster_id as u32),
951 subgraph_id: self.id,
952 });
953 }
954 }
955}
956
957#[expect(missing_docs, reason = "TODO")]
958#[derive(Clone)]
959pub struct DeployProcessSpec(RustCrate);
960
961impl DeployProcessSpec {
962 #[expect(missing_docs, reason = "TODO")]
963 pub fn new(t: RustCrate) -> Self {
964 Self(t)
965 }
966}
967
968impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
969 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
970 DeployNode {
971 id,
972 next_port: Rc::new(RefCell::new(0)),
973 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
974 underlying: Rc::new(RefCell::new(None)),
975 }
976 }
977}
978
979impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
980 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
981 self.name_hint = Some(format!("{} (process {id})", name_hint));
982 DeployNode {
983 id,
984 next_port: Rc::new(RefCell::new(0)),
985 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
986 underlying: Rc::new(RefCell::new(None)),
987 }
988 }
989}
990
991#[expect(missing_docs, reason = "TODO")]
992#[derive(Clone)]
993pub struct DeployClusterSpec(Vec<RustCrate>);
994
995impl DeployClusterSpec {
996 #[expect(missing_docs, reason = "TODO")]
997 pub fn new(crates: Vec<RustCrate>) -> Self {
998 Self(crates)
999 }
1000}
1001
1002impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1003 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1004 DeployCluster {
1005 id,
1006 next_port: Rc::new(RefCell::new(0)),
1007 cluster_spec: Rc::new(RefCell::new(Some(
1008 self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
1009 ))),
1010 members: Rc::new(RefCell::new(vec![])),
1011 name_hint: None,
1012 }
1013 }
1014}
1015
1016impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1017 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1018 let name_hint = format!("{} (cluster {id})", name_hint);
1019 DeployCluster {
1020 id,
1021 next_port: Rc::new(RefCell::new(0)),
1022 cluster_spec: Rc::new(RefCell::new(Some(
1023 self.into_iter()
1024 .enumerate()
1025 .map(|(idx, b)| {
1026 let mut b = b.into();
1027 b.name_hint = Some(name_hint.clone());
1028 b.cluster_idx = Some(idx);
1029 CrateOrTrybuild::Trybuild(b)
1030 })
1031 .collect(),
1032 ))),
1033 members: Rc::new(RefCell::new(vec![])),
1034 name_hint: Some(name_hint),
1035 }
1036 }
1037}
1038
1039fn create_trybuild_service(
1040 trybuild: TrybuildHost,
1041 dir: &std::path::PathBuf,
1042 target_dir: &std::path::PathBuf,
1043 features: &Option<Vec<String>>,
1044 bin_name: &str,
1045) -> RustCrate {
1046 let mut ret = RustCrate::new(dir, trybuild.host)
1047 .target_dir(target_dir)
1048 .example(bin_name)
1049 .no_default_features();
1050
1051 if let Some(display_name) = trybuild.display_name {
1052 ret = ret.display_name(display_name);
1053 } else if let Some(name_hint) = trybuild.name_hint {
1054 if let Some(cluster_idx) = trybuild.cluster_idx {
1055 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1056 } else {
1057 ret = ret.display_name(name_hint);
1058 }
1059 }
1060
1061 if let Some(rustflags) = trybuild.rustflags {
1062 ret = ret.rustflags(rustflags);
1063 }
1064
1065 if let Some(tracing) = trybuild.tracing {
1066 ret = ret.tracing(tracing);
1067 }
1068
1069 ret = ret.features(
1070 vec!["hydro___feature_deploy_integration".to_string()]
1071 .into_iter()
1072 .chain(
1073 trybuild
1074 .additional_hydro_features
1075 .into_iter()
1076 .map(|runtime_feature| {
1077 assert!(
1078 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1079 "{runtime_feature} is not a valid Hydro runtime feature"
1080 );
1081 format!("hydro___feature_{runtime_feature}")
1082 }),
1083 )
1084 .chain(trybuild.features),
1085 );
1086
1087 for (key, value) in trybuild.build_envs {
1088 ret = ret.build_env(key, value);
1089 }
1090
1091 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1092 ret = ret.config("build.incremental = false");
1093
1094 if let Some(features) = features {
1095 ret = ret.features(features);
1096 }
1097
1098 ret
1099}