1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26use tokio::sync::RwLock;
27
28use super::deploy_runtime::*;
29use crate::compile::deploy_provider::{
30 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
33use crate::location::dynamic::LocationId;
34use crate::location::member_id::TaglessMemberId;
35use crate::location::{MembershipEvent, NetworkHint};
36use crate::staging_util::get_this_crate;
37
38pub enum HydroDeploy {}
43
44impl<'a> Deploy<'a> for HydroDeploy {
45 type InstantiateEnv = Deployment;
46 type Process = DeployNode;
47 type Cluster = DeployCluster;
48 type External = DeployExternal;
49 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
50 type GraphId = ();
51 type Port = String;
52 type ExternalRawPort = CustomClientPort;
53
54 fn allocate_process_port(process: &Self::Process) -> Self::Port {
55 process.next_port()
56 }
57
58 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
59 cluster.next_port()
60 }
61
62 fn allocate_external_port(external: &Self::External) -> Self::Port {
63 external.next_port()
64 }
65
66 fn o2o_sink_source(
67 _p1: &Self::Process,
68 p1_port: &Self::Port,
69 _p2: &Self::Process,
70 p2_port: &Self::Port,
71 ) -> (syn::Expr, syn::Expr) {
72 let p1_port = p1_port.as_str();
73 let p2_port = p2_port.as_str();
74 deploy_o2o(
75 RuntimeData::new("__hydro_lang_trybuild_cli"),
76 p1_port,
77 p2_port,
78 )
79 }
80
81 fn o2o_connect(
82 p1: &Self::Process,
83 p1_port: &Self::Port,
84 p2: &Self::Process,
85 p2_port: &Self::Port,
86 ) -> Box<dyn FnOnce()> {
87 let p1 = p1.clone();
88 let p1_port = p1_port.clone();
89 let p2 = p2.clone();
90 let p2_port = p2_port.clone();
91
92 Box::new(move || {
93 let self_underlying_borrow = p1.underlying.borrow();
94 let self_underlying = self_underlying_borrow.as_ref().unwrap();
95 let source_port = self_underlying
96 .try_read()
97 .unwrap()
98 .get_port(p1_port.clone(), self_underlying);
99
100 let other_underlying_borrow = p2.underlying.borrow();
101 let other_underlying = other_underlying_borrow.as_ref().unwrap();
102 let recipient_port = other_underlying
103 .try_read()
104 .unwrap()
105 .get_port(p2_port.clone(), other_underlying);
106
107 source_port.send_to(&recipient_port)
108 })
109 }
110
111 fn o2m_sink_source(
112 _p1: &Self::Process,
113 p1_port: &Self::Port,
114 _c2: &Self::Cluster,
115 c2_port: &Self::Port,
116 ) -> (syn::Expr, syn::Expr) {
117 let p1_port = p1_port.as_str();
118 let c2_port = c2_port.as_str();
119 deploy_o2m(
120 RuntimeData::new("__hydro_lang_trybuild_cli"),
121 p1_port,
122 c2_port,
123 )
124 }
125
126 fn o2m_connect(
127 p1: &Self::Process,
128 p1_port: &Self::Port,
129 c2: &Self::Cluster,
130 c2_port: &Self::Port,
131 ) -> Box<dyn FnOnce()> {
132 let p1 = p1.clone();
133 let p1_port = p1_port.clone();
134 let c2 = c2.clone();
135 let c2_port = c2_port.clone();
136
137 Box::new(move || {
138 let self_underlying_borrow = p1.underlying.borrow();
139 let self_underlying = self_underlying_borrow.as_ref().unwrap();
140 let source_port = self_underlying
141 .try_read()
142 .unwrap()
143 .get_port(p1_port.clone(), self_underlying);
144
145 let recipient_port = DemuxSink {
146 demux: c2
147 .members
148 .borrow()
149 .iter()
150 .enumerate()
151 .map(|(id, c)| {
152 let n = c.underlying.try_read().unwrap();
153 (
154 id as u32,
155 Arc::new(n.get_port(c2_port.clone(), &c.underlying))
156 as Arc<dyn RustCrateSink + 'static>,
157 )
158 })
159 .collect(),
160 };
161
162 source_port.send_to(&recipient_port)
163 })
164 }
165
166 fn m2o_sink_source(
167 _c1: &Self::Cluster,
168 c1_port: &Self::Port,
169 _p2: &Self::Process,
170 p2_port: &Self::Port,
171 ) -> (syn::Expr, syn::Expr) {
172 let c1_port = c1_port.as_str();
173 let p2_port = p2_port.as_str();
174 deploy_m2o(
175 RuntimeData::new("__hydro_lang_trybuild_cli"),
176 c1_port,
177 p2_port,
178 )
179 }
180
181 fn m2o_connect(
182 c1: &Self::Cluster,
183 c1_port: &Self::Port,
184 p2: &Self::Process,
185 p2_port: &Self::Port,
186 ) -> Box<dyn FnOnce()> {
187 let c1 = c1.clone();
188 let c1_port = c1_port.clone();
189 let p2 = p2.clone();
190 let p2_port = p2_port.clone();
191
192 Box::new(move || {
193 let other_underlying_borrow = p2.underlying.borrow();
194 let other_underlying = other_underlying_borrow.as_ref().unwrap();
195 let recipient_port = other_underlying
196 .try_read()
197 .unwrap()
198 .get_port(p2_port.clone(), other_underlying)
199 .merge();
200
201 for (i, node) in c1.members.borrow().iter().enumerate() {
202 let source_port = node
203 .underlying
204 .try_read()
205 .unwrap()
206 .get_port(c1_port.clone(), &node.underlying);
207
208 TaggedSource {
209 source: Arc::new(source_port),
210 tag: i as u32,
211 }
212 .send_to(&recipient_port);
213 }
214 })
215 }
216
217 fn m2m_sink_source(
218 _c1: &Self::Cluster,
219 c1_port: &Self::Port,
220 _c2: &Self::Cluster,
221 c2_port: &Self::Port,
222 ) -> (syn::Expr, syn::Expr) {
223 let c1_port = c1_port.as_str();
224 let c2_port = c2_port.as_str();
225 deploy_m2m(
226 RuntimeData::new("__hydro_lang_trybuild_cli"),
227 c1_port,
228 c2_port,
229 )
230 }
231
232 fn m2m_connect(
233 c1: &Self::Cluster,
234 c1_port: &Self::Port,
235 c2: &Self::Cluster,
236 c2_port: &Self::Port,
237 ) -> Box<dyn FnOnce()> {
238 let c1 = c1.clone();
239 let c1_port = c1_port.clone();
240 let c2 = c2.clone();
241 let c2_port = c2_port.clone();
242
243 Box::new(move || {
244 for (i, sender) in c1.members.borrow().iter().enumerate() {
245 let source_port = sender
246 .underlying
247 .try_read()
248 .unwrap()
249 .get_port(c1_port.clone(), &sender.underlying);
250
251 let recipient_port = DemuxSink {
252 demux: c2
253 .members
254 .borrow()
255 .iter()
256 .enumerate()
257 .map(|(id, c)| {
258 let n = c.underlying.try_read().unwrap();
259 (
260 id as u32,
261 Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge())
262 as Arc<dyn RustCrateSink + 'static>,
263 )
264 })
265 .collect(),
266 };
267
268 TaggedSource {
269 source: Arc::new(source_port),
270 tag: i as u32,
271 }
272 .send_to(&recipient_port);
273 }
274 })
275 }
276
277 fn e2o_many_source(
278 extra_stmts: &mut Vec<syn::Stmt>,
279 _p2: &Self::Process,
280 p2_port: &Self::Port,
281 codec_type: &syn::Type,
282 shared_handle: String,
283 ) -> syn::Expr {
284 let connect_ident = syn::Ident::new(
285 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
286 Span::call_site(),
287 );
288 let source_ident = syn::Ident::new(
289 &format!("__hydro_deploy_many_{}_source", &shared_handle),
290 Span::call_site(),
291 );
292 let sink_ident = syn::Ident::new(
293 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
294 Span::call_site(),
295 );
296 let membership_ident = syn::Ident::new(
297 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
298 Span::call_site(),
299 );
300
301 let root = get_this_crate();
302
303 extra_stmts.push(syn::parse_quote! {
304 let #connect_ident = __hydro_lang_trybuild_cli
305 .port(#p2_port)
306 .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
307 });
308
309 extra_stmts.push(syn::parse_quote! {
310 let #source_ident = #connect_ident.source;
311 });
312
313 extra_stmts.push(syn::parse_quote! {
314 let #sink_ident = #connect_ident.sink;
315 });
316
317 extra_stmts.push(syn::parse_quote! {
318 let #membership_ident = #connect_ident.membership;
319 });
320
321 parse_quote!(#source_ident)
322 }
323
324 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
325 let sink_ident = syn::Ident::new(
326 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
327 Span::call_site(),
328 );
329 parse_quote!(#sink_ident)
330 }
331
332 fn e2o_source(
333 extra_stmts: &mut Vec<syn::Stmt>,
334 _p1: &Self::External,
335 _p1_port: &Self::Port,
336 _p2: &Self::Process,
337 p2_port: &Self::Port,
338 codec_type: &syn::Type,
339 shared_handle: String,
340 ) -> syn::Expr {
341 let connect_ident = syn::Ident::new(
342 &format!("__hydro_deploy_{}_connect", &shared_handle),
343 Span::call_site(),
344 );
345 let source_ident = syn::Ident::new(
346 &format!("__hydro_deploy_{}_source", &shared_handle),
347 Span::call_site(),
348 );
349 let sink_ident = syn::Ident::new(
350 &format!("__hydro_deploy_{}_sink", &shared_handle),
351 Span::call_site(),
352 );
353
354 let root = get_this_crate();
355
356 extra_stmts.push(syn::parse_quote! {
357 let #connect_ident = __hydro_lang_trybuild_cli
358 .port(#p2_port)
359 .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
360 });
361
362 extra_stmts.push(syn::parse_quote! {
363 let #source_ident = #connect_ident.source;
364 });
365
366 extra_stmts.push(syn::parse_quote! {
367 let #sink_ident = #connect_ident.sink;
368 });
369
370 parse_quote!(#source_ident)
371 }
372
373 fn e2o_connect(
374 p1: &Self::External,
375 p1_port: &Self::Port,
376 p2: &Self::Process,
377 p2_port: &Self::Port,
378 _many: bool,
379 server_hint: NetworkHint,
380 ) -> Box<dyn FnOnce()> {
381 let p1 = p1.clone();
382 let p1_port = p1_port.clone();
383 let p2 = p2.clone();
384 let p2_port = p2_port.clone();
385
386 Box::new(move || {
387 let self_underlying_borrow = p1.underlying.borrow();
388 let self_underlying = self_underlying_borrow.as_ref().unwrap();
389 let source_port = self_underlying
390 .try_read()
391 .unwrap()
392 .declare_many_client(self_underlying);
393
394 let other_underlying_borrow = p2.underlying.borrow();
395 let other_underlying = other_underlying_borrow.as_ref().unwrap();
396 let recipient_port = other_underlying.try_read().unwrap().get_port_with_hint(
397 p2_port.clone(),
398 match server_hint {
399 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
400 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
401 },
402 other_underlying,
403 );
404
405 source_port.send_to(&recipient_port);
406
407 p1.client_ports
408 .borrow_mut()
409 .insert(p1_port.clone(), source_port);
410 })
411 }
412
413 fn o2e_sink(
414 _p1: &Self::Process,
415 _p1_port: &Self::Port,
416 _p2: &Self::External,
417 _p2_port: &Self::Port,
418 shared_handle: String,
419 ) -> syn::Expr {
420 let sink_ident = syn::Ident::new(
421 &format!("__hydro_deploy_{}_sink", &shared_handle),
422 Span::call_site(),
423 );
424 parse_quote!(#sink_ident)
425 }
426
427 fn cluster_ids(
428 of_cluster: usize,
429 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
430 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
431 }
432
433 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
434 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
435 }
436
437 fn cluster_membership_stream(
438 location_id: &LocationId,
439 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
440 {
441 cluster_membership_stream(location_id)
442 }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446pub trait DeployCrateWrapper {
447 fn underlying(&self) -> Arc<RwLock<RustCrateService>>;
448
449 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
450 async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451 self.underlying().read().await.stdout()
452 }
453
454 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
455 async fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
456 self.underlying().read().await.stderr()
457 }
458
459 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
460 async fn stdout_filter(
461 &self,
462 prefix: impl Into<String>,
463 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
464 self.underlying().read().await.stdout_filter(prefix.into())
465 }
466
467 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
468 async fn stderr_filter(
469 &self,
470 prefix: impl Into<String>,
471 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
472 self.underlying().read().await.stderr_filter(prefix.into())
473 }
474
475 #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")]
476 async fn tracing_results(&self) -> Option<TracingResults> {
477 self.underlying().read().await.tracing_results().cloned()
478 }
479}
480
481#[expect(missing_docs, reason = "TODO")]
482#[derive(Clone)]
483pub struct TrybuildHost {
484 host: Arc<dyn Host>,
485 display_name: Option<String>,
486 rustflags: Option<String>,
487 additional_hydro_features: Vec<String>,
488 features: Vec<String>,
489 tracing: Option<TracingOptions>,
490 build_envs: Vec<(String, String)>,
491 name_hint: Option<String>,
492 cluster_idx: Option<usize>,
493}
494
495impl From<Arc<dyn Host>> for TrybuildHost {
496 fn from(host: Arc<dyn Host>) -> Self {
497 Self {
498 host,
499 display_name: None,
500 rustflags: None,
501 additional_hydro_features: vec![],
502 features: vec![],
503 tracing: None,
504 build_envs: vec![],
505 name_hint: None,
506 cluster_idx: None,
507 }
508 }
509}
510
511impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
512 fn from(host: Arc<H>) -> Self {
513 Self {
514 host,
515 display_name: None,
516 rustflags: None,
517 additional_hydro_features: vec![],
518 features: vec![],
519 tracing: None,
520 build_envs: vec![],
521 name_hint: None,
522 cluster_idx: None,
523 }
524 }
525}
526
527#[expect(missing_docs, reason = "TODO")]
528impl TrybuildHost {
529 pub fn new(host: Arc<dyn Host>) -> Self {
530 Self {
531 host,
532 display_name: None,
533 rustflags: None,
534 additional_hydro_features: vec![],
535 features: vec![],
536 tracing: None,
537 build_envs: vec![],
538 name_hint: None,
539 cluster_idx: None,
540 }
541 }
542
543 pub fn display_name(self, display_name: impl Into<String>) -> Self {
544 if self.display_name.is_some() {
545 panic!("{} already set", name_of!(display_name in Self));
546 }
547
548 Self {
549 display_name: Some(display_name.into()),
550 ..self
551 }
552 }
553
554 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
555 if self.rustflags.is_some() {
556 panic!("{} already set", name_of!(rustflags in Self));
557 }
558
559 Self {
560 rustflags: Some(rustflags.into()),
561 ..self
562 }
563 }
564
565 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
566 Self {
567 additional_hydro_features,
568 ..self
569 }
570 }
571
572 pub fn features(self, features: Vec<String>) -> Self {
573 Self {
574 features: self.features.into_iter().chain(features).collect(),
575 ..self
576 }
577 }
578
579 pub fn tracing(self, tracing: TracingOptions) -> Self {
580 if self.tracing.is_some() {
581 panic!("{} already set", name_of!(tracing in Self));
582 }
583
584 Self {
585 tracing: Some(tracing),
586 ..self
587 }
588 }
589
590 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
591 Self {
592 build_envs: self
593 .build_envs
594 .into_iter()
595 .chain(std::iter::once((key.into(), value.into())))
596 .collect(),
597 ..self
598 }
599 }
600}
601
602impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
603 type ProcessSpec = TrybuildHost;
604 fn into_process_spec(self) -> TrybuildHost {
605 TrybuildHost {
606 host: self,
607 display_name: None,
608 rustflags: None,
609 additional_hydro_features: vec![],
610 features: vec![],
611 tracing: None,
612 build_envs: vec![],
613 name_hint: None,
614 cluster_idx: None,
615 }
616 }
617}
618
619impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
620 type ProcessSpec = TrybuildHost;
621 fn into_process_spec(self) -> TrybuildHost {
622 TrybuildHost {
623 host: self,
624 display_name: None,
625 rustflags: None,
626 additional_hydro_features: vec![],
627 features: vec![],
628 tracing: None,
629 build_envs: vec![],
630 name_hint: None,
631 cluster_idx: None,
632 }
633 }
634}
635
636#[expect(missing_docs, reason = "TODO")]
637#[derive(Clone)]
638pub struct DeployExternal {
639 next_port: Rc<RefCell<usize>>,
640 host: Arc<dyn Host>,
641 underlying: Rc<RefCell<Option<Arc<RwLock<CustomService>>>>>,
642 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
643 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
644}
645
646impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
647 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
648 assert!(
649 self.allocated_ports
650 .borrow_mut()
651 .insert(key, port.clone())
652 .is_none_or(|old| old == port)
653 );
654 }
655
656 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
657 self.client_ports
658 .borrow()
659 .get(self.allocated_ports.borrow().get(&key).unwrap())
660 .unwrap()
661 .clone()
662 }
663
664 fn as_bytes_bidi(
665 &self,
666 key: usize,
667 ) -> impl Future<
668 Output = (
669 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
670 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
671 ),
672 > + 'a {
673 let port = self.raw_port(key);
674
675 async move {
676 let (source, sink) = port.connect().await.into_source_sink();
677 (
678 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
679 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
680 )
681 }
682 }
683
684 fn as_bincode_bidi<InT, OutT>(
685 &self,
686 key: usize,
687 ) -> impl Future<
688 Output = (
689 Pin<Box<dyn Stream<Item = OutT>>>,
690 Pin<Box<dyn Sink<InT, Error = Error>>>,
691 ),
692 > + 'a
693 where
694 InT: Serialize + 'static,
695 OutT: DeserializeOwned + 'static,
696 {
697 let port = self.raw_port(key);
698 async move {
699 let (source, sink) = port.connect().await.into_source_sink();
700 (
701 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
702 as Pin<Box<dyn Stream<Item = OutT>>>,
703 Box::pin(
704 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
705 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
706 )
707 }
708 }
709
710 fn as_bincode_sink<T: Serialize + 'static>(
711 &self,
712 key: usize,
713 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
714 let port = self.raw_port(key);
715 async move {
716 let sink = port.connect().await.into_sink();
717 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
718 as Pin<Box<dyn Sink<T, Error = Error>>>
719 }
720 }
721
722 fn as_bincode_source<T: DeserializeOwned + 'static>(
723 &self,
724 key: usize,
725 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
726 let port = self.raw_port(key);
727 async move {
728 let source = port.connect().await.into_source();
729 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
730 as Pin<Box<dyn Stream<Item = T>>>
731 }
732 }
733}
734
735impl Node for DeployExternal {
736 type Port = String;
737 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
738 type InstantiateEnv = Deployment;
739
740 fn next_port(&self) -> Self::Port {
741 let next_port = *self.next_port.borrow();
742 *self.next_port.borrow_mut() += 1;
743
744 format!("port_{}", next_port)
745 }
746
747 fn instantiate(
748 &self,
749 env: &mut Self::InstantiateEnv,
750 _meta: &mut Self::Meta,
751 _graph: DfirGraph,
752 _extra_stmts: Vec<syn::Stmt>,
753 ) {
754 let service = env.CustomService(self.host.clone(), vec![]);
755 *self.underlying.borrow_mut() = Some(service);
756 }
757
758 fn update_meta(&mut self, _meta: &Self::Meta) {}
759}
760
761impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
762 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
763 DeployExternal {
764 next_port: Rc::new(RefCell::new(0)),
765 host: self,
766 underlying: Rc::new(RefCell::new(None)),
767 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
768 client_ports: Rc::new(RefCell::new(HashMap::new())),
769 }
770 }
771}
772
773impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
774 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
775 DeployExternal {
776 next_port: Rc::new(RefCell::new(0)),
777 host: self,
778 underlying: Rc::new(RefCell::new(None)),
779 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
780 client_ports: Rc::new(RefCell::new(HashMap::new())),
781 }
782 }
783}
784
785pub(crate) enum CrateOrTrybuild {
786 Crate(RustCrate, Arc<dyn Host>),
787 Trybuild(TrybuildHost),
788}
789
790#[expect(missing_docs, reason = "TODO")]
791#[derive(Clone)]
792pub struct DeployNode {
793 id: usize,
794 next_port: Rc<RefCell<usize>>,
795 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
796 underlying: Rc<RefCell<Option<Arc<RwLock<RustCrateService>>>>>,
797}
798
799impl DeployCrateWrapper for DeployNode {
800 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
801 self.underlying.borrow().as_ref().unwrap().clone()
802 }
803}
804
805impl Node for DeployNode {
806 type Port = String;
807 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
808 type InstantiateEnv = Deployment;
809
810 fn next_port(&self) -> String {
811 let next_port = *self.next_port.borrow();
812 *self.next_port.borrow_mut() += 1;
813
814 format!("port_{}", next_port)
815 }
816
817 fn update_meta(&mut self, meta: &Self::Meta) {
818 let underlying_node = self.underlying.borrow();
819 let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
820 n.update_meta(HydroMeta {
821 clusters: meta.clone(),
822 cluster_id: None,
823 subgraph_id: self.id,
824 });
825 }
826
827 fn instantiate(
828 &self,
829 env: &mut Self::InstantiateEnv,
830 _meta: &mut Self::Meta,
831 graph: DfirGraph,
832 extra_stmts: Vec<syn::Stmt>,
833 ) {
834 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
835 CrateOrTrybuild::Crate(c, host) => (c, host),
836 CrateOrTrybuild::Trybuild(trybuild) => {
837 let (bin_name, config) =
838 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint);
839 let host = trybuild.host.clone();
840 (
841 create_trybuild_service(
842 trybuild,
843 &config.project_dir,
844 &config.target_dir,
845 &config.features,
846 &bin_name,
847 ),
848 host,
849 )
850 }
851 };
852
853 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
854 }
855}
856
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployClusterNode {
860 underlying: Arc<RwLock<RustCrateService>>,
861}
862
863impl DeployCrateWrapper for DeployClusterNode {
864 fn underlying(&self) -> Arc<RwLock<RustCrateService>> {
865 self.underlying.clone()
866 }
867}
868#[expect(missing_docs, reason = "TODO")]
869#[derive(Clone)]
870pub struct DeployCluster {
871 id: usize,
872 next_port: Rc<RefCell<usize>>,
873 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
874 members: Rc<RefCell<Vec<DeployClusterNode>>>,
875 name_hint: Option<String>,
876}
877
878impl DeployCluster {
879 #[expect(missing_docs, reason = "TODO")]
880 pub fn members(&self) -> Vec<DeployClusterNode> {
881 self.members.borrow().clone()
882 }
883}
884
885impl Node for DeployCluster {
886 type Port = String;
887 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
888 type InstantiateEnv = Deployment;
889
890 fn next_port(&self) -> String {
891 let next_port = *self.next_port.borrow();
892 *self.next_port.borrow_mut() += 1;
893
894 format!("port_{}", next_port)
895 }
896
897 fn instantiate(
898 &self,
899 env: &mut Self::InstantiateEnv,
900 meta: &mut Self::Meta,
901 graph: DfirGraph,
902 extra_stmts: Vec<syn::Stmt>,
903 ) {
904 let has_trybuild = self
905 .cluster_spec
906 .borrow()
907 .as_ref()
908 .unwrap()
909 .iter()
910 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
911
912 let maybe_trybuild = if has_trybuild {
913 Some(create_graph_trybuild(graph, extra_stmts, &self.name_hint))
914 } else {
915 None
916 };
917
918 let cluster_nodes = self
919 .cluster_spec
920 .borrow_mut()
921 .take()
922 .unwrap()
923 .into_iter()
924 .map(|spec| {
925 let (service, host) = match spec {
926 CrateOrTrybuild::Crate(c, host) => (c, host),
927 CrateOrTrybuild::Trybuild(trybuild) => {
928 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
929 let host = trybuild.host.clone();
930 (
931 create_trybuild_service(
932 trybuild,
933 &config.project_dir,
934 &config.target_dir,
935 &config.features,
936 bin_name,
937 ),
938 host,
939 )
940 }
941 };
942
943 env.add_service(service, host)
944 })
945 .collect::<Vec<_>>();
946 meta.insert(
947 self.id,
948 (0..(cluster_nodes.len() as u32))
949 .map(TaglessMemberId::from_raw_id)
950 .collect(),
951 );
952 *self.members.borrow_mut() = cluster_nodes
953 .into_iter()
954 .map(|n| DeployClusterNode { underlying: n })
955 .collect();
956 }
957
958 fn update_meta(&mut self, meta: &Self::Meta) {
959 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
960 let mut n = node.underlying.try_write().unwrap();
961 n.update_meta(HydroMeta {
962 clusters: meta.clone(),
963 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
964 subgraph_id: self.id,
965 });
966 }
967 }
968}
969
970#[expect(missing_docs, reason = "TODO")]
971#[derive(Clone)]
972pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
973
974impl DeployProcessSpec {
975 #[expect(missing_docs, reason = "TODO")]
976 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
977 Self(t, host)
978 }
979}
980
981impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
982 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
983 DeployNode {
984 id,
985 next_port: Rc::new(RefCell::new(0)),
986 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
987 underlying: Rc::new(RefCell::new(None)),
988 }
989 }
990}
991
992impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
993 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
994 self.name_hint = Some(format!("{} (process {id})", name_hint));
995 DeployNode {
996 id,
997 next_port: Rc::new(RefCell::new(0)),
998 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
999 underlying: Rc::new(RefCell::new(None)),
1000 }
1001 }
1002}
1003
1004#[expect(missing_docs, reason = "TODO")]
1005#[derive(Clone)]
1006pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1007
1008impl DeployClusterSpec {
1009 #[expect(missing_docs, reason = "TODO")]
1010 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1011 Self(crates)
1012 }
1013}
1014
1015impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1016 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1017 DeployCluster {
1018 id,
1019 next_port: Rc::new(RefCell::new(0)),
1020 cluster_spec: Rc::new(RefCell::new(Some(
1021 self.0
1022 .into_iter()
1023 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1024 .collect(),
1025 ))),
1026 members: Rc::new(RefCell::new(vec![])),
1027 name_hint: None,
1028 }
1029 }
1030}
1031
1032impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1033 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1034 let name_hint = format!("{} (cluster {id})", name_hint);
1035 DeployCluster {
1036 id,
1037 next_port: Rc::new(RefCell::new(0)),
1038 cluster_spec: Rc::new(RefCell::new(Some(
1039 self.into_iter()
1040 .enumerate()
1041 .map(|(idx, b)| {
1042 let mut b = b.into();
1043 b.name_hint = Some(name_hint.clone());
1044 b.cluster_idx = Some(idx);
1045 CrateOrTrybuild::Trybuild(b)
1046 })
1047 .collect(),
1048 ))),
1049 members: Rc::new(RefCell::new(vec![])),
1050 name_hint: Some(name_hint),
1051 }
1052 }
1053}
1054
1055fn create_trybuild_service(
1056 trybuild: TrybuildHost,
1057 dir: &std::path::PathBuf,
1058 target_dir: &std::path::PathBuf,
1059 features: &Option<Vec<String>>,
1060 bin_name: &str,
1061) -> RustCrate {
1062 let mut ret = RustCrate::new(dir)
1063 .target_dir(target_dir)
1064 .example(bin_name)
1065 .no_default_features();
1066
1067 if let Some(display_name) = trybuild.display_name {
1068 ret = ret.display_name(display_name);
1069 } else if let Some(name_hint) = trybuild.name_hint {
1070 if let Some(cluster_idx) = trybuild.cluster_idx {
1071 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1072 } else {
1073 ret = ret.display_name(name_hint);
1074 }
1075 }
1076
1077 if let Some(rustflags) = trybuild.rustflags {
1078 ret = ret.rustflags(rustflags);
1079 }
1080
1081 if let Some(tracing) = trybuild.tracing {
1082 ret = ret.tracing(tracing);
1083 }
1084
1085 ret = ret.features(
1086 vec!["hydro___feature_deploy_integration".to_string()]
1087 .into_iter()
1088 .chain(
1089 trybuild
1090 .additional_hydro_features
1091 .into_iter()
1092 .map(|runtime_feature| {
1093 assert!(
1094 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1095 "{runtime_feature} is not a valid Hydro runtime feature"
1096 );
1097 format!("hydro___feature_{runtime_feature}")
1098 }),
1099 )
1100 .chain(trybuild.features),
1101 );
1102
1103 for (key, value) in trybuild.build_envs {
1104 ret = ret.build_env(key, value);
1105 }
1106
1107 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1108 ret = ret.config("build.incremental = false");
1109
1110 if let Some(features) = features {
1111 ret = ret.features(features);
1112 }
1113
1114 ret
1115}