1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use super::trybuild::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
30use crate::compile::deploy_provider::{
31 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::location::NetworkHint;
34use crate::staging_util::get_this_crate;
35
36pub enum HydroDeploy {}
41
42impl<'a> Deploy<'a> for HydroDeploy {
43 type InstantiateEnv = Deployment;
44 type CompileEnv = ();
45 type Process = DeployNode;
46 type Cluster = DeployCluster;
47 type External = DeployExternal;
48 type Meta = HashMap<usize, Vec<u32>>;
49 type GraphId = ();
50 type Port = String;
51 type ExternalRawPort = CustomClientPort;
52
53 fn allocate_process_port(process: &Self::Process) -> Self::Port {
54 process.next_port()
55 }
56
57 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58 cluster.next_port()
59 }
60
61 fn allocate_external_port(external: &Self::External) -> Self::Port {
62 external.next_port()
63 }
64
65 fn o2o_sink_source(
66 _env: &(),
67 _p1: &Self::Process,
68 p1_port: &Self::Port,
69 _p2: &Self::Process,
70 p2_port: &Self::Port,
71 ) -> (syn::Expr, syn::Expr) {
72 let p1_port = p1_port.as_str();
73 let p2_port = p2_port.as_str();
74 deploy_o2o(
75 RuntimeData::new("__hydro_lang_trybuild_cli"),
76 p1_port,
77 p2_port,
78 )
79 }
80
81 fn o2o_connect(
82 p1: &Self::Process,
83 p1_port: &Self::Port,
84 p2: &Self::Process,
85 p2_port: &Self::Port,
86 ) -> Box<dyn FnOnce()> {
87 let p1 = p1.clone();
88 let p1_port = p1_port.clone();
89 let p2 = p2.clone();
90 let p2_port = p2_port.clone();
91
92 Box::new(move || {
93 let self_underlying_borrow = p1.underlying.borrow();
94 let self_underlying = self_underlying_borrow.as_ref().unwrap();
95 let source_port = self_underlying
96 .try_read()
97 .unwrap()
98 .get_port(p1_port.clone(), self_underlying);
99
100 let other_underlying_borrow = p2.underlying.borrow();
101 let other_underlying = other_underlying_borrow.as_ref().unwrap();
102 let recipient_port = other_underlying
103 .try_read()
104 .unwrap()
105 .get_port(p2_port.clone(), other_underlying);
106
107 source_port.send_to(&recipient_port)
108 })
109 }
110
111 fn o2m_sink_source(
112 _env: &(),
113 _p1: &Self::Process,
114 p1_port: &Self::Port,
115 _c2: &Self::Cluster,
116 c2_port: &Self::Port,
117 ) -> (syn::Expr, syn::Expr) {
118 let p1_port = p1_port.as_str();
119 let c2_port = c2_port.as_str();
120 deploy_o2m(
121 RuntimeData::new("__hydro_lang_trybuild_cli"),
122 p1_port,
123 c2_port,
124 )
125 }
126
127 fn o2m_connect(
128 p1: &Self::Process,
129 p1_port: &Self::Port,
130 c2: &Self::Cluster,
131 c2_port: &Self::Port,
132 ) -> Box<dyn FnOnce()> {
133 let p1 = p1.clone();
134 let p1_port = p1_port.clone();
135 let c2 = c2.clone();
136 let c2_port = c2_port.clone();
137
138 Box::new(move || {
139 let self_underlying_borrow = p1.underlying.borrow();
140 let self_underlying = self_underlying_borrow.as_ref().unwrap();
141 let source_port = self_underlying
142 .try_read()
143 .unwrap()
144 .get_port(p1_port.clone(), self_underlying);
145
146 let recipient_port = DemuxSink {
147 demux: c2
148 .members
149 .borrow()
150 .iter()
151 .enumerate()
152 .map(|(id, c)| {
153 let n = c.underlying.try_read().unwrap();
154 (
155 id as u32,
156 Arc::new(n.get_port(c2_port.clone(), &c.underlying))
157 as Arc<dyn RustCrateSink + 'static>,
158 )
159 })
160 .collect(),
161 };
162
163 source_port.send_to(&recipient_port)
164 })
165 }
166
167 fn m2o_sink_source(
168 _env: &(),
169 _c1: &Self::Cluster,
170 c1_port: &Self::Port,
171 _p2: &Self::Process,
172 p2_port: &Self::Port,
173 ) -> (syn::Expr, syn::Expr) {
174 let c1_port = c1_port.as_str();
175 let p2_port = p2_port.as_str();
176 deploy_m2o(
177 RuntimeData::new("__hydro_lang_trybuild_cli"),
178 c1_port,
179 p2_port,
180 )
181 }
182
183 fn m2o_connect(
184 c1: &Self::Cluster,
185 c1_port: &Self::Port,
186 p2: &Self::Process,
187 p2_port: &Self::Port,
188 ) -> Box<dyn FnOnce()> {
189 let c1 = c1.clone();
190 let c1_port = c1_port.clone();
191 let p2 = p2.clone();
192 let p2_port = p2_port.clone();
193
194 Box::new(move || {
195 let other_underlying_borrow = p2.underlying.borrow();
196 let other_underlying = other_underlying_borrow.as_ref().unwrap();
197 let recipient_port = other_underlying
198 .try_read()
199 .unwrap()
200 .get_port(p2_port.clone(), other_underlying)
201 .merge();
202
203 for (i, node) in c1.members.borrow().iter().enumerate() {
204 let source_port = node
205 .underlying
206 .try_read()
207 .unwrap()
208 .get_port(c1_port.clone(), &node.underlying);
209
210 TaggedSource {
211 source: Arc::new(source_port),
212 tag: i as u32,
213 }
214 .send_to(&recipient_port);
215 }
216 })
217 }
218
219 fn m2m_sink_source(
220 _env: &(),
221 _c1: &Self::Cluster,
222 c1_port: &Self::Port,
223 _c2: &Self::Cluster,
224 c2_port: &Self::Port,
225 ) -> (syn::Expr, syn::Expr) {
226 let c1_port = c1_port.as_str();
227 let c2_port = c2_port.as_str();
228 deploy_m2m(
229 RuntimeData::new("__hydro_lang_trybuild_cli"),
230 c1_port,
231 c2_port,
232 )
233 }
234
235 fn m2m_connect(
236 c1: &Self::Cluster,
237 c1_port: &Self::Port,
238 c2: &Self::Cluster,
239 c2_port: &Self::Port,
240 ) -> Box<dyn FnOnce()> {
241 let c1 = c1.clone();
242 let c1_port = c1_port.clone();
243 let c2 = c2.clone();
244 let c2_port = c2_port.clone();
245
246 Box::new(move || {
247 for (i, sender) in c1.members.borrow().iter().enumerate() {
248 let source_port = sender
249 .underlying
250 .try_read()
251 .unwrap()
252 .get_port(c1_port.clone(), &sender.underlying);
253
254 let recipient_port = DemuxSink {
255 demux: c2
256 .members
257 .borrow()
258 .iter()
259 .enumerate()
260 .map(|(id, c)| {
261 let n = c.underlying.try_read().unwrap();
262 (
263 id as u32,
264 Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
265 as Arc<dyn RustCrateSink + 'static>,
266 )
267 })
268 .collect(),
269 };
270
271 TaggedSource {
272 source: Arc::new(source_port),
273 tag: i as u32,
274 }
275 .send_to(&recipient_port);
276 }
277 })
278 }
279
280 fn e2o_many_source(
281 _compile_env: &Self::CompileEnv,
282 extra_stmts: &mut Vec<syn::Stmt>,
283 _p2: &Self::Process,
284 p2_port: &Self::Port,
285 codec_type: &syn::Type,
286 shared_handle: String,
287 ) -> syn::Expr {
288 let connect_ident = syn::Ident::new(
289 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
290 Span::call_site(),
291 );
292 let source_ident = syn::Ident::new(
293 &format!("__hydro_deploy_many_{}_source", &shared_handle),
294 Span::call_site(),
295 );
296 let sink_ident = syn::Ident::new(
297 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
298 Span::call_site(),
299 );
300 let membership_ident = syn::Ident::new(
301 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
302 Span::call_site(),
303 );
304
305 let root = get_this_crate();
306
307 extra_stmts.push(syn::parse_quote! {
308 let #connect_ident = __hydro_lang_trybuild_cli
309 .port(#p2_port)
310 .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
311 });
312
313 extra_stmts.push(syn::parse_quote! {
314 let #source_ident = #connect_ident.source;
315 });
316
317 extra_stmts.push(syn::parse_quote! {
318 let #sink_ident = #connect_ident.sink;
319 });
320
321 extra_stmts.push(syn::parse_quote! {
322 let #membership_ident = #connect_ident.membership;
323 });
324
325 parse_quote!(#source_ident)
326 }
327
328 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
329 let sink_ident = syn::Ident::new(
330 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
331 Span::call_site(),
332 );
333 parse_quote!(#sink_ident)
334 }
335
336 fn e2o_source(
337 _compile_env: &Self::CompileEnv,
338 _p1: &Self::External,
339 p1_port: &Self::Port,
340 _p2: &Self::Process,
341 p2_port: &Self::Port,
342 ) -> syn::Expr {
343 let p1_port = p1_port.as_str();
344 let p2_port = p2_port.as_str();
345 deploy_e2o(
346 RuntimeData::new("__hydro_lang_trybuild_cli"),
347 p1_port,
348 p2_port,
349 )
350 }
351
352 fn e2o_connect(
353 p1: &Self::External,
354 p1_port: &Self::Port,
355 p2: &Self::Process,
356 p2_port: &Self::Port,
357 many: bool,
358 server_hint: NetworkHint,
359 ) -> Box<dyn FnOnce()> {
360 let p1 = p1.clone();
361 let p1_port = p1_port.clone();
362 let p2 = p2.clone();
363 let p2_port = p2_port.clone();
364
365 Box::new(move || {
366 let self_underlying_borrow = p1.underlying.borrow();
367 let self_underlying = self_underlying_borrow.as_ref().unwrap();
368 let source_port = if many {
369 self_underlying
370 .try_read()
371 .unwrap()
372 .declare_many_client(self_underlying)
373 } else {
374 self_underlying
375 .try_read()
376 .unwrap()
377 .declare_client(self_underlying)
378 };
379
380 let other_underlying_borrow = p2.underlying.borrow();
381 let other_underlying = other_underlying_borrow.as_ref().unwrap();
382 let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
383 p2_port.clone(),
384 match server_hint {
385 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
386 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
387 },
388 other_underlying,
389 );
390
391 source_port.send_to(&recipient_port);
392
393 p1.client_ports
394 .borrow_mut()
395 .insert(p1_port.clone(), source_port);
396 })
397 }
398
399 fn o2e_sink(
400 _compile_env: &Self::CompileEnv,
401 _p1: &Self::Process,
402 p1_port: &Self::Port,
403 _p2: &Self::External,
404 p2_port: &Self::Port,
405 ) -> syn::Expr {
406 let p1_port = p1_port.as_str();
407 let p2_port = p2_port.as_str();
408 deploy_o2e(
409 RuntimeData::new("__hydro_lang_trybuild_cli"),
410 p1_port,
411 p2_port,
412 )
413 }
414
415 fn o2e_connect(
416 p1: &Self::Process,
417 p1_port: &Self::Port,
418 p2: &Self::External,
419 p2_port: &Self::Port,
420 ) -> Box<dyn FnOnce()> {
421 let p1 = p1.clone();
422 let p1_port = p1_port.clone();
423 let p2 = p2.clone();
424 let p2_port = p2_port.clone();
425
426 Box::new(move || {
427 let self_underlying_borrow = p1.underlying.borrow();
428 let self_underlying = self_underlying_borrow.as_ref().unwrap();
429 let source_port = self_underlying
430 .try_read()
431 .unwrap()
432 .get_port(p1_port.clone(), self_underlying);
433
434 let other_underlying_borrow = p2.underlying.borrow();
435 let other_underlying = other_underlying_borrow.as_ref().unwrap();
436 let recipient_port = other_underlying
437 .try_read()
438 .unwrap()
439 .declare_client(other_underlying);
440
441 source_port.send_to(&recipient_port);
442
443 p2.client_ports
444 .borrow_mut()
445 .insert(p2_port.clone(), recipient_port);
446 })
447 }
448
449 fn cluster_ids(
450 _env: &Self::CompileEnv,
451 of_cluster: usize,
452 ) -> impl QuotedWithContext<'a, &'a [u32], ()> + Copy + 'a {
453 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
454 }
455
456 fn cluster_self_id(_env: &Self::CompileEnv) -> impl QuotedWithContext<'a, u32, ()> + Copy + 'a {
457 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
458 }
459}
460
461#[expect(missing_docs, reason = "TODO")]
462pub trait DeployCrateWrapper {
463 fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
464
465 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
466 async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
467 self.underlying().read().await.stdout()
468 }
469
470 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
471 async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472 self.underlying().read().await.stderr()
473 }
474
475 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476 async fn stdout_filter(
477 &self,
478 prefix: impl Into<String>,
479 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
480 self.underlying().read().await.stdout_filter(prefix.into())
481 }
482
483 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
484 async fn stderr_filter(
485 &self,
486 prefix: impl Into<String>,
487 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
488 self.underlying().read().await.stderr_filter(prefix.into())
489 }
490
491 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
492 async fn tracing_results(&self) -> Option<TracingResults> {
493 self.underlying().read().await.tracing_results().cloned()
494 }
495}
496
497#[expect(missing_docs, reason = "TODO")]
498#[derive(Clone)]
499pub struct TrybuildHost {
500 host: Arc<dyn Host>,
501 display_name: Option<String>,
502 rustflags: Option<String>,
503 additional_hydro_features: Vec<String>,
504 features: Vec<String>,
505 tracing: Option<TracingOptions>,
506 build_envs: Vec<(String, String)>,
507 name_hint: Option<String>,
508 cluster_idx: Option<usize>,
509}
510
511impl From<Arc<dyn Host>> for TrybuildHost {
512 fn from(host: Arc<dyn Host>) -> Self {
513 Self {
514 host,
515 display_name: None,
516 rustflags: None,
517 additional_hydro_features: vec![],
518 features: vec![],
519 tracing: None,
520 build_envs: vec![],
521 name_hint: None,
522 cluster_idx: None,
523 }
524 }
525}
526
527impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
528 fn from(host: Arc<H>) -> Self {
529 Self {
530 host,
531 display_name: None,
532 rustflags: None,
533 additional_hydro_features: vec![],
534 features: vec![],
535 tracing: None,
536 build_envs: vec![],
537 name_hint: None,
538 cluster_idx: None,
539 }
540 }
541}
542
543#[expect(missing_docs, reason = "TODO")]
544impl TrybuildHost {
545 pub fn new(host: Arc<dyn Host>) -> Self {
546 Self {
547 host,
548 display_name: None,
549 rustflags: None,
550 additional_hydro_features: vec![],
551 features: vec![],
552 tracing: None,
553 build_envs: vec![],
554 name_hint: None,
555 cluster_idx: None,
556 }
557 }
558
559 pub fn display_name(self, display_name: impl Into<String>) -> Self {
560 if self.display_name.is_some() {
561 panic!("{} already set", name_of!(display_name in Self));
562 }
563
564 Self {
565 display_name: Some(display_name.into()),
566 ..self
567 }
568 }
569
570 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
571 if self.rustflags.is_some() {
572 panic!("{} already set", name_of!(rustflags in Self));
573 }
574
575 Self {
576 rustflags: Some(rustflags.into()),
577 ..self
578 }
579 }
580
581 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
582 Self {
583 additional_hydro_features,
584 ..self
585 }
586 }
587
588 pub fn features(self, features: Vec<String>) -> Self {
589 Self {
590 features: self.features.into_iter().chain(features).collect(),
591 ..self
592 }
593 }
594
595 pub fn tracing(self, tracing: TracingOptions) -> Self {
596 if self.tracing.is_some() {
597 panic!("{} already set", name_of!(tracing in Self));
598 }
599
600 Self {
601 tracing: Some(tracing),
602 ..self
603 }
604 }
605
606 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
607 Self {
608 build_envs: self
609 .build_envs
610 .into_iter()
611 .chain(std::iter::once((key.into(), value.into())))
612 .collect(),
613 ..self
614 }
615 }
616}
617
618impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
619 type ProcessSpec = TrybuildHost;
620 fn into_process_spec(self) -> TrybuildHost {
621 TrybuildHost {
622 host: self,
623 display_name: None,
624 rustflags: None,
625 additional_hydro_features: vec![],
626 features: vec![],
627 tracing: None,
628 build_envs: vec![],
629 name_hint: None,
630 cluster_idx: None,
631 }
632 }
633}
634
635impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
636 type ProcessSpec = TrybuildHost;
637 fn into_process_spec(self) -> TrybuildHost {
638 TrybuildHost {
639 host: self,
640 display_name: None,
641 rustflags: None,
642 additional_hydro_features: vec![],
643 features: vec![],
644 tracing: None,
645 build_envs: vec![],
646 name_hint: None,
647 cluster_idx: None,
648 }
649 }
650}
651
652#[expect(missing_docs, reason = "TODO")]
653#[derive(Clone)]
654pub struct DeployExternal {
655 next_port: Rc<RefCell<usize>>,
656 host: Arc<dyn Host>,
657 underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
658 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
659 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
660}
661
662impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
663 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
664 self.allocated_ports.borrow_mut().insert(key, port);
665 }
666
667 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
668 self.client_ports
669 .borrow()
670 .get(self.allocated_ports.borrow().get(&key).unwrap())
671 .unwrap()
672 .clone()
673 }
674
675 fn as_bytes_bidi(
676 &self,
677 key: usize,
678 ) -> impl Future<
679 Output = (
680 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
681 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
682 ),
683 > + 'a {
684 let port = self.raw_port(key);
685
686 async move {
687 let (source, sink) = port.connect().await.into_source_sink();
688 (
689 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
690 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
691 )
692 }
693 }
694
695 fn as_bincode_bidi<InT, OutT>(
696 &self,
697 key: usize,
698 ) -> impl Future<
699 Output = (
700 Pin<Box<dyn Stream<Item = OutT>>>,
701 Pin<Box<dyn Sink<InT, Error = Error>>>,
702 ),
703 > + 'a
704 where
705 InT: Serialize + 'static,
706 OutT: DeserializeOwned + 'static,
707 {
708 let port = self.raw_port(key);
709 async move {
710 let (source, sink) = port.connect().await.into_source_sink();
711 (
712 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
713 as Pin<Box<dyn Stream<Item = OutT>>>,
714 Box::pin(
715 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
716 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
717 )
718 }
719 }
720
721 fn as_bincode_sink<T: Serialize + 'static>(
722 &self,
723 key: usize,
724 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
725 let port = self.raw_port(key);
726 async move {
727 let sink = port.connect().await.into_sink();
728 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
729 as Pin<Box<dyn Sink<T, Error = Error>>>
730 }
731 }
732
733 fn as_bincode_source<T: DeserializeOwned + 'static>(
734 &self,
735 key: usize,
736 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
737 let port = self.raw_port(key);
738 async move {
739 let source = port.connect().await.into_source();
740 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
741 as Pin<Box<dyn Stream<Item = T>>>
742 }
743 }
744}
745
746impl Node for DeployExternal {
747 type Port = String;
748 type Meta = HashMap<usize, Vec<u32>>;
749 type InstantiateEnv = Deployment;
750
751 fn next_port(&self) -> Self::Port {
752 let next_port = *self.next_port.borrow();
753 *self.next_port.borrow_mut() += 1;
754
755 format!("port_{}", next_port)
756 }
757
758 fn instantiate(
759 &self,
760 env: &mut Self::InstantiateEnv,
761 _meta: &mut Self::Meta,
762 _graph: DfirGraph,
763 _extra_stmts: Vec<syn::Stmt>,
764 ) {
765 let service = env.CustomService(self.host.clone(), vec![]);
766 *self.underlying.borrow_mut() = Some(service);
767 }
768
769 fn update_meta(&mut self, _meta: &Self::Meta) {}
770}
771
772impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
773 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
774 DeployExternal {
775 next_port: Rc::new(RefCell::new(0)),
776 host: self,
777 underlying: Rc::new(RefCell::new(None)),
778 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
779 client_ports: Rc::new(RefCell::new(HashMap::new())),
780 }
781 }
782}
783
784impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
785 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
786 DeployExternal {
787 next_port: Rc::new(RefCell::new(0)),
788 host: self,
789 underlying: Rc::new(RefCell::new(None)),
790 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
791 client_ports: Rc::new(RefCell::new(HashMap::new())),
792 }
793 }
794}
795
796pub(crate) enum CrateOrTrybuild {
797 Crate(RustCrate),
798 Trybuild(TrybuildHost),
799}
800
801#[expect(missing_docs, reason = "TODO")]
802#[derive(Clone)]
803pub struct DeployNode {
804 id: usize,
805 next_port: Rc<RefCell<usize>>,
806 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
807 underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
808}
809
810impl DeployCrateWrapper for DeployNode {
811 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
812 self.underlying.borrow().as_ref().unwrap().clone()
813 }
814}
815
816impl Node for DeployNode {
817 type Port = String;
818 type Meta = HashMap<usize, Vec<u32>>;
819 type InstantiateEnv = Deployment;
820
821 fn next_port(&self) -> String {
822 let next_port = *self.next_port.borrow();
823 *self.next_port.borrow_mut() += 1;
824
825 format!("port_{}", next_port)
826 }
827
828 fn update_meta(&mut self, meta: &Self::Meta) {
829 let underlying_node = self.underlying.borrow();
830 let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
831 n.update_meta(HydroMeta {
832 clusters: meta.clone(),
833 cluster_id: None,
834 subgraph_id: self.id,
835 });
836 }
837
838 fn instantiate(
839 &self,
840 env: &mut Self::InstantiateEnv,
841 _meta: &mut Self::Meta,
842 graph: DfirGraph,
843 extra_stmts: Vec<syn::Stmt>,
844 ) {
845 let service = match self.service_spec.borrow_mut().take().unwrap() {
846 CrateOrTrybuild::Crate(c) => c,
847 CrateOrTrybuild::Trybuild(trybuild) => {
848 let (bin_name, config) =
849 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
850 create_trybuild_service(
851 trybuild,
852 &config.project_dir,
853 &config.target_dir,
854 &config.features,
855 &bin_name,
856 )
857 }
858 };
859
860 *self.underlying.borrow_mut() = Some(env.add_service(service));
861 }
862}
863
864#[expect(missing_docs, reason = "TODO")]
865#[derive(Clone)]
866pub struct DeployClusterNode {
867 underlying: Arc<RwLock<RustCrateService>>,
868}
869
870impl DeployCrateWrapper for DeployClusterNode {
871 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
872 self.underlying.clone()
873 }
874}
875#[expect(missing_docs, reason = "TODO")]
876#[derive(Clone)]
877pub struct DeployCluster {
878 id: usize,
879 next_port: Rc<RefCell<usize>>,
880 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
881 members: Rc<RefCell<Vec<DeployClusterNode>>>,
882 name_hint: Option<String>,
883}
884
885impl DeployCluster {
886 #[expect(missing_docs, reason = "TODO")]
887 pub fn members(&self) -> Vec<DeployClusterNode> {
888 self.members.borrow().clone()
889 }
890}
891
892impl Node for DeployCluster {
893 type Port = String;
894 type Meta = HashMap<usize, Vec<u32>>;
895 type InstantiateEnv = Deployment;
896
897 fn next_port(&self) -> String {
898 let next_port = *self.next_port.borrow();
899 *self.next_port.borrow_mut() += 1;
900
901 format!("port_{}", next_port)
902 }
903
904 fn instantiate(
905 &self,
906 env: &mut Self::InstantiateEnv,
907 meta: &mut Self::Meta,
908 graph: DfirGraph,
909 extra_stmts: Vec<syn::Stmt>,
910 ) {
911 let has_trybuild = self
912 .cluster_spec
913 .borrow()
914 .as_ref()
915 .unwrap()
916 .iter()
917 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
918
919 let maybe_trybuild = if has_trybuild {
920 Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
921 } else {
922 None
923 };
924
925 let cluster_nodes = self
926 .cluster_spec
927 .borrow_mut()
928 .take()
929 .unwrap()
930 .into_iter()
931 .map(|spec| {
932 let service = match spec {
933 CrateOrTrybuild::Crate(c) => c,
934 CrateOrTrybuild::Trybuild(trybuild) => {
935 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
936 create_trybuild_service(
937 trybuild,
938 &config.project_dir,
939 &config.target_dir,
940 &config.features,
941 bin_name,
942 )
943 }
944 };
945
946 env.add_service(service)
947 })
948 .collect::<Vec<_>>();
949 meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
950 *self.members.borrow_mut() = cluster_nodes
951 .into_iter()
952 .map(|n| DeployClusterNode { underlying: n })
953 .collect();
954 }
955
956 fn update_meta(&mut self, meta: &Self::Meta) {
957 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
958 let mut n = node.underlying.try_write().unwrap();
959 n.update_meta(HydroMeta {
960 clusters: meta.clone(),
961 cluster_id: Some(cluster_id as u32),
962 subgraph_id: self.id,
963 });
964 }
965 }
966}
967
968#[expect(missing_docs, reason = "TODO")]
969#[derive(Clone)]
970pub struct DeployProcessSpec(RustCrate);
971
972impl DeployProcessSpec {
973 #[expect(missing_docs, reason = "TODO")]
974 pub fn new(t: RustCrate) -> Self {
975 Self(t)
976 }
977}
978
979impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
980 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
981 DeployNode {
982 id,
983 next_port: Rc::new(RefCell::new(0)),
984 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0)))),
985 underlying: Rc::new(RefCell::new(None)),
986 }
987 }
988}
989
990impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
991 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
992 self.name_hint = Some(format!("{} (process {id})", name_hint));
993 DeployNode {
994 id,
995 next_port: Rc::new(RefCell::new(0)),
996 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
997 underlying: Rc::new(RefCell::new(None)),
998 }
999 }
1000}
1001
1002#[expect(missing_docs, reason = "TODO")]
1003#[derive(Clone)]
1004pub struct DeployClusterSpec(Vec<RustCrate>);
1005
1006impl DeployClusterSpec {
1007 #[expect(missing_docs, reason = "TODO")]
1008 pub fn new(crates: Vec<RustCrate>) -> Self {
1009 Self(crates)
1010 }
1011}
1012
1013impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1014 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1015 DeployCluster {
1016 id,
1017 next_port: Rc::new(RefCell::new(0)),
1018 cluster_spec: Rc::new(RefCell::new(Some(
1019 self.0.into_iter().map(CrateOrTrybuild::Crate).collect(),
1020 ))),
1021 members: Rc::new(RefCell::new(vec![])),
1022 name_hint: None,
1023 }
1024 }
1025}
1026
1027impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1028 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1029 let name_hint = format!("{} (cluster {id})", name_hint);
1030 DeployCluster {
1031 id,
1032 next_port: Rc::new(RefCell::new(0)),
1033 cluster_spec: Rc::new(RefCell::new(Some(
1034 self.into_iter()
1035 .enumerate()
1036 .map(|(idx, b)| {
1037 let mut b = b.into();
1038 b.name_hint = Some(name_hint.clone());
1039 b.cluster_idx = Some(idx);
1040 CrateOrTrybuild::Trybuild(b)
1041 })
1042 .collect(),
1043 ))),
1044 members: Rc::new(RefCell::new(vec![])),
1045 name_hint: Some(name_hint),
1046 }
1047 }
1048}
1049
1050fn create_trybuild_service(
1051 trybuild: TrybuildHost,
1052 dir: &std::path::PathBuf,
1053 target_dir: &std::path::PathBuf,
1054 features: &Option<Vec<String>>,
1055 bin_name: &str,
1056) -> RustCrate {
1057 let mut ret = RustCrate::new(dir, trybuild.host)
1058 .target_dir(target_dir)
1059 .bin(bin_name)
1060 .no_default_features();
1061
1062 if let Some(display_name) = trybuild.display_name {
1063 ret = ret.display_name(display_name);
1064 } else if let Some(name_hint) = trybuild.name_hint {
1065 if let Some(cluster_idx) = trybuild.cluster_idx {
1066 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1067 } else {
1068 ret = ret.display_name(name_hint);
1069 }
1070 }
1071
1072 if let Some(rustflags) = trybuild.rustflags {
1073 ret = ret.rustflags(rustflags);
1074 }
1075
1076 if let Some(tracing) = trybuild.tracing {
1077 ret = ret.tracing(tracing);
1078 }
1079
1080 ret = ret.features(
1081 vec!["hydro___feature_deploy_integration".to_string()]
1082 .into_iter()
1083 .chain(
1084 trybuild
1085 .additional_hydro_features
1086 .into_iter()
1087 .map(|runtime_feature| {
1088 assert!(
1089 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1090 "{runtime_feature} is not a valid Hydro runtime feature"
1091 );
1092 format!("hydro___feature_{runtime_feature}")
1093 }),
1094 )
1095 .chain(trybuild.features),
1096 );
1097
1098 for (key, value) in trybuild.build_envs {
1099 ret = ret.build_env(key, value);
1100 }
1101
1102 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1103 ret = ret.config("build.incremental = false");
1104
1105 if let Some(features) = features {
1106 ret = ret.features(features);
1107 }
1108
1109 ret
1110}