1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26
27use super::deploy_runtime::*;
28use crate::compile::builder::ExternalPortId;
29use crate::compile::deploy_provider::{
30 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
31};
32use crate::compile::trybuild::generate::{
33 HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
34};
35use crate::location::dynamic::LocationId;
36use crate::location::member_id::TaglessMemberId;
37use crate::location::{MembershipEvent, NetworkHint};
38use crate::staging_util::get_this_crate;
39
40pub enum HydroDeploy {}
45
46impl<'a> Deploy<'a> for HydroDeploy {
47 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
48 type InstantiateEnv = Deployment;
49
50 type Process = DeployNode;
51 type Cluster = DeployCluster;
52 type External = DeployExternal;
53
54 fn o2o_sink_source(
55 _p1: &Self::Process,
56 p1_port: &<Self::Process as Node>::Port,
57 _p2: &Self::Process,
58 p2_port: &<Self::Process as Node>::Port,
59 ) -> (syn::Expr, syn::Expr) {
60 let p1_port = p1_port.as_str();
61 let p2_port = p2_port.as_str();
62 deploy_o2o(
63 RuntimeData::new("__hydro_lang_trybuild_cli"),
64 p1_port,
65 p2_port,
66 )
67 }
68
69 fn o2o_connect(
70 p1: &Self::Process,
71 p1_port: &<Self::Process as Node>::Port,
72 p2: &Self::Process,
73 p2_port: &<Self::Process as Node>::Port,
74 ) -> Box<dyn FnOnce()> {
75 let p1 = p1.clone();
76 let p1_port = p1_port.clone();
77 let p2 = p2.clone();
78 let p2_port = p2_port.clone();
79
80 Box::new(move || {
81 let self_underlying_borrow = p1.underlying.borrow();
82 let self_underlying = self_underlying_borrow.as_ref().unwrap();
83 let source_port = self_underlying.get_port(p1_port.clone());
84
85 let other_underlying_borrow = p2.underlying.borrow();
86 let other_underlying = other_underlying_borrow.as_ref().unwrap();
87 let recipient_port = other_underlying.get_port(p2_port.clone());
88
89 source_port.send_to(&recipient_port)
90 })
91 }
92
93 fn o2m_sink_source(
94 _p1: &Self::Process,
95 p1_port: &<Self::Process as Node>::Port,
96 _c2: &Self::Cluster,
97 c2_port: &<Self::Cluster as Node>::Port,
98 ) -> (syn::Expr, syn::Expr) {
99 let p1_port = p1_port.as_str();
100 let c2_port = c2_port.as_str();
101 deploy_o2m(
102 RuntimeData::new("__hydro_lang_trybuild_cli"),
103 p1_port,
104 c2_port,
105 )
106 }
107
108 fn o2m_connect(
109 p1: &Self::Process,
110 p1_port: &<Self::Process as Node>::Port,
111 c2: &Self::Cluster,
112 c2_port: &<Self::Cluster as Node>::Port,
113 ) -> Box<dyn FnOnce()> {
114 let p1 = p1.clone();
115 let p1_port = p1_port.clone();
116 let c2 = c2.clone();
117 let c2_port = c2_port.clone();
118
119 Box::new(move || {
120 let self_underlying_borrow = p1.underlying.borrow();
121 let self_underlying = self_underlying_borrow.as_ref().unwrap();
122 let source_port = self_underlying.get_port(p1_port.clone());
123
124 let recipient_port = DemuxSink {
125 demux: c2
126 .members
127 .borrow()
128 .iter()
129 .enumerate()
130 .map(|(id, c)| {
131 (
132 id as u32,
133 Arc::new(c.underlying.get_port(c2_port.clone()))
134 as Arc<dyn RustCrateSink + 'static>,
135 )
136 })
137 .collect(),
138 };
139
140 source_port.send_to(&recipient_port)
141 })
142 }
143
144 fn m2o_sink_source(
145 _c1: &Self::Cluster,
146 c1_port: &<Self::Cluster as Node>::Port,
147 _p2: &Self::Process,
148 p2_port: &<Self::Process as Node>::Port,
149 ) -> (syn::Expr, syn::Expr) {
150 let c1_port = c1_port.as_str();
151 let p2_port = p2_port.as_str();
152 deploy_m2o(
153 RuntimeData::new("__hydro_lang_trybuild_cli"),
154 c1_port,
155 p2_port,
156 )
157 }
158
159 fn m2o_connect(
160 c1: &Self::Cluster,
161 c1_port: &<Self::Cluster as Node>::Port,
162 p2: &Self::Process,
163 p2_port: &<Self::Process as Node>::Port,
164 ) -> Box<dyn FnOnce()> {
165 let c1 = c1.clone();
166 let c1_port = c1_port.clone();
167 let p2 = p2.clone();
168 let p2_port = p2_port.clone();
169
170 Box::new(move || {
171 let other_underlying_borrow = p2.underlying.borrow();
172 let other_underlying = other_underlying_borrow.as_ref().unwrap();
173 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
174
175 for (i, node) in c1.members.borrow().iter().enumerate() {
176 let source_port = node.underlying.get_port(c1_port.clone());
177
178 TaggedSource {
179 source: Arc::new(source_port),
180 tag: i as u32,
181 }
182 .send_to(&recipient_port);
183 }
184 })
185 }
186
187 fn m2m_sink_source(
188 _c1: &Self::Cluster,
189 c1_port: &<Self::Cluster as Node>::Port,
190 _c2: &Self::Cluster,
191 c2_port: &<Self::Cluster as Node>::Port,
192 ) -> (syn::Expr, syn::Expr) {
193 let c1_port = c1_port.as_str();
194 let c2_port = c2_port.as_str();
195 deploy_m2m(
196 RuntimeData::new("__hydro_lang_trybuild_cli"),
197 c1_port,
198 c2_port,
199 )
200 }
201
202 fn m2m_connect(
203 c1: &Self::Cluster,
204 c1_port: &<Self::Cluster as Node>::Port,
205 c2: &Self::Cluster,
206 c2_port: &<Self::Cluster as Node>::Port,
207 ) -> Box<dyn FnOnce()> {
208 let c1 = c1.clone();
209 let c1_port = c1_port.clone();
210 let c2 = c2.clone();
211 let c2_port = c2_port.clone();
212
213 Box::new(move || {
214 for (i, sender) in c1.members.borrow().iter().enumerate() {
215 let source_port = sender.underlying.get_port(c1_port.clone());
216
217 let recipient_port = DemuxSink {
218 demux: c2
219 .members
220 .borrow()
221 .iter()
222 .enumerate()
223 .map(|(id, c)| {
224 (
225 id as u32,
226 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
227 as Arc<dyn RustCrateSink + 'static>,
228 )
229 })
230 .collect(),
231 };
232
233 TaggedSource {
234 source: Arc::new(source_port),
235 tag: i as u32,
236 }
237 .send_to(&recipient_port);
238 }
239 })
240 }
241
242 fn e2o_many_source(
243 extra_stmts: &mut Vec<syn::Stmt>,
244 _p2: &Self::Process,
245 p2_port: &<Self::Process as Node>::Port,
246 codec_type: &syn::Type,
247 shared_handle: String,
248 ) -> syn::Expr {
249 let connect_ident = syn::Ident::new(
250 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
251 Span::call_site(),
252 );
253 let source_ident = syn::Ident::new(
254 &format!("__hydro_deploy_many_{}_source", &shared_handle),
255 Span::call_site(),
256 );
257 let sink_ident = syn::Ident::new(
258 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
259 Span::call_site(),
260 );
261 let membership_ident = syn::Ident::new(
262 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
263 Span::call_site(),
264 );
265
266 let root = get_this_crate();
267
268 extra_stmts.push(syn::parse_quote! {
269 let #connect_ident = __hydro_lang_trybuild_cli
270 .port(#p2_port)
271 .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
272 });
273
274 extra_stmts.push(syn::parse_quote! {
275 let #source_ident = #connect_ident.source;
276 });
277
278 extra_stmts.push(syn::parse_quote! {
279 let #sink_ident = #connect_ident.sink;
280 });
281
282 extra_stmts.push(syn::parse_quote! {
283 let #membership_ident = #connect_ident.membership;
284 });
285
286 parse_quote!(#source_ident)
287 }
288
289 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
290 let sink_ident = syn::Ident::new(
291 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
292 Span::call_site(),
293 );
294 parse_quote!(#sink_ident)
295 }
296
297 fn e2o_source(
298 extra_stmts: &mut Vec<syn::Stmt>,
299 _p1: &Self::External,
300 _p1_port: &<Self::External as Node>::Port,
301 _p2: &Self::Process,
302 p2_port: &<Self::Process as Node>::Port,
303 codec_type: &syn::Type,
304 shared_handle: String,
305 ) -> syn::Expr {
306 let connect_ident = syn::Ident::new(
307 &format!("__hydro_deploy_{}_connect", &shared_handle),
308 Span::call_site(),
309 );
310 let source_ident = syn::Ident::new(
311 &format!("__hydro_deploy_{}_source", &shared_handle),
312 Span::call_site(),
313 );
314 let sink_ident = syn::Ident::new(
315 &format!("__hydro_deploy_{}_sink", &shared_handle),
316 Span::call_site(),
317 );
318
319 let root = get_this_crate();
320
321 extra_stmts.push(syn::parse_quote! {
322 let #connect_ident = __hydro_lang_trybuild_cli
323 .port(#p2_port)
324 .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
325 });
326
327 extra_stmts.push(syn::parse_quote! {
328 let #source_ident = #connect_ident.source;
329 });
330
331 extra_stmts.push(syn::parse_quote! {
332 let #sink_ident = #connect_ident.sink;
333 });
334
335 parse_quote!(#source_ident)
336 }
337
338 fn e2o_connect(
339 p1: &Self::External,
340 p1_port: &<Self::External as Node>::Port,
341 p2: &Self::Process,
342 p2_port: &<Self::Process as Node>::Port,
343 _many: bool,
344 server_hint: NetworkHint,
345 ) -> Box<dyn FnOnce()> {
346 let p1 = p1.clone();
347 let p1_port = p1_port.clone();
348 let p2 = p2.clone();
349 let p2_port = p2_port.clone();
350
351 Box::new(move || {
352 let self_underlying_borrow = p1.underlying.borrow();
353 let self_underlying = self_underlying_borrow.as_ref().unwrap();
354 let source_port = self_underlying.declare_many_client();
355
356 let other_underlying_borrow = p2.underlying.borrow();
357 let other_underlying = other_underlying_borrow.as_ref().unwrap();
358 let recipient_port = other_underlying.get_port_with_hint(
359 p2_port.clone(),
360 match server_hint {
361 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
362 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
363 },
364 );
365
366 source_port.send_to(&recipient_port);
367
368 p1.client_ports
369 .borrow_mut()
370 .insert(p1_port.clone(), source_port);
371 })
372 }
373
374 fn o2e_sink(
375 _p1: &Self::Process,
376 _p1_port: &<Self::Process as Node>::Port,
377 _p2: &Self::External,
378 _p2_port: &<Self::External as Node>::Port,
379 shared_handle: String,
380 ) -> syn::Expr {
381 let sink_ident = syn::Ident::new(
382 &format!("__hydro_deploy_{}_sink", &shared_handle),
383 Span::call_site(),
384 );
385 parse_quote!(#sink_ident)
386 }
387
388 fn cluster_ids(
389 of_cluster: usize,
390 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
391 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
392 }
393
394 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
395 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
396 }
397
398 fn cluster_membership_stream(
399 location_id: &LocationId,
400 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
401 {
402 cluster_membership_stream(location_id)
403 }
404}
405
406#[expect(missing_docs, reason = "TODO")]
407pub trait DeployCrateWrapper {
408 fn underlying(&self) -> Arc<RustCrateService>;
409
410 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
411 self.underlying().stdout()
412 }
413
414 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
415 self.underlying().stderr()
416 }
417
418 fn stdout_filter(
419 &self,
420 prefix: impl Into<String>,
421 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
422 self.underlying().stdout_filter(prefix.into())
423 }
424
425 fn stderr_filter(
426 &self,
427 prefix: impl Into<String>,
428 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
429 self.underlying().stderr_filter(prefix.into())
430 }
431}
432
433#[expect(missing_docs, reason = "TODO")]
434#[derive(Clone)]
435pub struct TrybuildHost {
436 host: Arc<dyn Host>,
437 display_name: Option<String>,
438 rustflags: Option<String>,
439 profile: Option<String>,
440 additional_hydro_features: Vec<String>,
441 features: Vec<String>,
442 tracing: Option<TracingOptions>,
443 build_envs: Vec<(String, String)>,
444 name_hint: Option<String>,
445 cluster_idx: Option<usize>,
446}
447
448impl From<Arc<dyn Host>> for TrybuildHost {
449 fn from(host: Arc<dyn Host>) -> Self {
450 Self {
451 host,
452 display_name: None,
453 rustflags: None,
454 profile: None,
455 additional_hydro_features: vec![],
456 features: vec![],
457 tracing: None,
458 build_envs: vec![],
459 name_hint: None,
460 cluster_idx: None,
461 }
462 }
463}
464
465impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
466 fn from(host: Arc<H>) -> Self {
467 Self {
468 host,
469 display_name: None,
470 rustflags: None,
471 profile: None,
472 additional_hydro_features: vec![],
473 features: vec![],
474 tracing: None,
475 build_envs: vec![],
476 name_hint: None,
477 cluster_idx: None,
478 }
479 }
480}
481
482#[expect(missing_docs, reason = "TODO")]
483impl TrybuildHost {
484 pub fn new(host: Arc<dyn Host>) -> Self {
485 Self {
486 host,
487 display_name: None,
488 rustflags: None,
489 profile: None,
490 additional_hydro_features: vec![],
491 features: vec![],
492 tracing: None,
493 build_envs: vec![],
494 name_hint: None,
495 cluster_idx: None,
496 }
497 }
498
499 pub fn display_name(self, display_name: impl Into<String>) -> Self {
500 if self.display_name.is_some() {
501 panic!("{} already set", name_of!(display_name in Self));
502 }
503
504 Self {
505 display_name: Some(display_name.into()),
506 ..self
507 }
508 }
509
510 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
511 if self.rustflags.is_some() {
512 panic!("{} already set", name_of!(rustflags in Self));
513 }
514
515 Self {
516 rustflags: Some(rustflags.into()),
517 ..self
518 }
519 }
520
521 pub fn profile(self, profile: impl Into<String>) -> Self {
522 if self.profile.is_some() {
523 panic!("{} already set", name_of!(profile in Self));
524 }
525
526 Self {
527 profile: Some(profile.into()),
528 ..self
529 }
530 }
531
532 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
533 Self {
534 additional_hydro_features,
535 ..self
536 }
537 }
538
539 pub fn features(self, features: Vec<String>) -> Self {
540 Self {
541 features: self.features.into_iter().chain(features).collect(),
542 ..self
543 }
544 }
545
546 pub fn tracing(self, tracing: TracingOptions) -> Self {
547 if self.tracing.is_some() {
548 panic!("{} already set", name_of!(tracing in Self));
549 }
550
551 Self {
552 tracing: Some(tracing),
553 ..self
554 }
555 }
556
557 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
558 Self {
559 build_envs: self
560 .build_envs
561 .into_iter()
562 .chain(std::iter::once((key.into(), value.into())))
563 .collect(),
564 ..self
565 }
566 }
567}
568
569impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
570 type ProcessSpec = TrybuildHost;
571 fn into_process_spec(self) -> TrybuildHost {
572 TrybuildHost {
573 host: self,
574 display_name: None,
575 rustflags: None,
576 profile: None,
577 additional_hydro_features: vec![],
578 features: vec![],
579 tracing: None,
580 build_envs: vec![],
581 name_hint: None,
582 cluster_idx: None,
583 }
584 }
585}
586
587impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
588 type ProcessSpec = TrybuildHost;
589 fn into_process_spec(self) -> TrybuildHost {
590 TrybuildHost {
591 host: self,
592 display_name: None,
593 rustflags: None,
594 profile: None,
595 additional_hydro_features: vec![],
596 features: vec![],
597 tracing: None,
598 build_envs: vec![],
599 name_hint: None,
600 cluster_idx: None,
601 }
602 }
603}
604
605#[expect(missing_docs, reason = "TODO")]
606#[derive(Clone)]
607pub struct DeployExternal {
608 next_port: Rc<RefCell<usize>>,
609 host: Arc<dyn Host>,
610 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
611 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
612 allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
613}
614
615impl DeployExternal {
616 pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
617 self.client_ports
618 .borrow()
619 .get(
620 self.allocated_ports
621 .borrow()
622 .get(&external_port_id)
623 .unwrap(),
624 )
625 .unwrap()
626 .clone()
627 }
628}
629
630impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
631 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
632 assert!(
633 self.allocated_ports
634 .borrow_mut()
635 .insert(external_port_id, port.clone())
636 .is_none_or(|old| old == port)
637 );
638 }
639
640 fn as_bytes_bidi(
641 &self,
642 external_port_id: ExternalPortId,
643 ) -> impl Future<
644 Output = (
645 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
646 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
647 ),
648 > + 'a {
649 let port = self.raw_port(external_port_id);
650
651 async move {
652 let (source, sink) = port.connect().await.into_source_sink();
653 (
654 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
655 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
656 )
657 }
658 }
659
660 fn as_bincode_bidi<InT, OutT>(
661 &self,
662 external_port_id: ExternalPortId,
663 ) -> impl Future<
664 Output = (
665 Pin<Box<dyn Stream<Item = OutT>>>,
666 Pin<Box<dyn Sink<InT, Error = Error>>>,
667 ),
668 > + 'a
669 where
670 InT: Serialize + 'static,
671 OutT: DeserializeOwned + 'static,
672 {
673 let port = self.raw_port(external_port_id);
674 async move {
675 let (source, sink) = port.connect().await.into_source_sink();
676 (
677 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
678 as Pin<Box<dyn Stream<Item = OutT>>>,
679 Box::pin(
680 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
681 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
682 )
683 }
684 }
685
686 fn as_bincode_sink<T: Serialize + 'static>(
687 &self,
688 external_port_id: ExternalPortId,
689 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
690 let port = self.raw_port(external_port_id);
691 async move {
692 let sink = port.connect().await.into_sink();
693 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
694 as Pin<Box<dyn Sink<T, Error = Error>>>
695 }
696 }
697
698 fn as_bincode_source<T: DeserializeOwned + 'static>(
699 &self,
700 external_port_id: ExternalPortId,
701 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
702 let port = self.raw_port(external_port_id);
703 async move {
704 let source = port.connect().await.into_source();
705 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
706 as Pin<Box<dyn Stream<Item = T>>>
707 }
708 }
709}
710
711impl Node for DeployExternal {
712 type Port = String;
713 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
714 type InstantiateEnv = Deployment;
715
716 fn next_port(&self) -> Self::Port {
717 let next_port = *self.next_port.borrow();
718 *self.next_port.borrow_mut() += 1;
719
720 format!("port_{}", next_port)
721 }
722
723 fn instantiate(
724 &self,
725 env: &mut Self::InstantiateEnv,
726 _meta: &mut Self::Meta,
727 _graph: DfirGraph,
728 _extra_stmts: Vec<syn::Stmt>,
729 ) {
730 let service = env.CustomService(self.host.clone(), vec![]);
731 *self.underlying.borrow_mut() = Some(service);
732 }
733
734 fn update_meta(&self, _meta: &Self::Meta) {}
735}
736
737impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
738 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
739 DeployExternal {
740 next_port: Rc::new(RefCell::new(0)),
741 host: self,
742 underlying: Rc::new(RefCell::new(None)),
743 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
744 client_ports: Rc::new(RefCell::new(HashMap::new())),
745 }
746 }
747}
748
749impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
750 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
751 DeployExternal {
752 next_port: Rc::new(RefCell::new(0)),
753 host: self,
754 underlying: Rc::new(RefCell::new(None)),
755 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
756 client_ports: Rc::new(RefCell::new(HashMap::new())),
757 }
758 }
759}
760
761pub(crate) enum CrateOrTrybuild {
762 Crate(RustCrate, Arc<dyn Host>),
763 Trybuild(TrybuildHost),
764}
765
766#[expect(missing_docs, reason = "TODO")]
767#[derive(Clone)]
768pub struct DeployNode {
769 next_port: Rc<RefCell<usize>>,
770 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
771 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
772}
773
774impl DeployCrateWrapper for DeployNode {
775 fn underlying(&self) -> Arc<RustCrateService> {
776 Arc::clone(self.underlying.borrow().as_ref().unwrap())
777 }
778}
779
780impl Node for DeployNode {
781 type Port = String;
782 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
783 type InstantiateEnv = Deployment;
784
785 fn next_port(&self) -> String {
786 let next_port = *self.next_port.borrow();
787 *self.next_port.borrow_mut() += 1;
788
789 format!("port_{}", next_port)
790 }
791
792 fn update_meta(&self, meta: &Self::Meta) {
793 let underlying_node = self.underlying.borrow();
794 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
795 clusters: meta.clone(),
796 cluster_id: None,
797 });
798 }
799
800 fn instantiate(
801 &self,
802 env: &mut Self::InstantiateEnv,
803 _meta: &mut Self::Meta,
804 graph: DfirGraph,
805 extra_stmts: Vec<syn::Stmt>,
806 ) {
807 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
808 CrateOrTrybuild::Crate(c, host) => (c, host),
809 CrateOrTrybuild::Trybuild(trybuild) => {
810 let linking_mode = if !cfg!(target_os = "windows")
812 && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
813 {
814 LinkingMode::Dynamic
817 } else {
818 LinkingMode::Static
819 };
820 let (bin_name, config) = create_graph_trybuild(
821 graph,
822 extra_stmts,
823 &trybuild.name_hint,
824 false,
825 linking_mode,
826 );
827 let host = trybuild.host.clone();
828 (
829 create_trybuild_service(
830 trybuild,
831 &config.project_dir,
832 &config.target_dir,
833 &config.features,
834 &bin_name,
835 &config.linking_mode,
836 ),
837 host,
838 )
839 }
840 };
841
842 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
843 }
844}
845
846#[expect(missing_docs, reason = "TODO")]
847#[derive(Clone)]
848pub struct DeployClusterNode {
849 underlying: Arc<RustCrateService>,
850}
851
852impl DeployCrateWrapper for DeployClusterNode {
853 fn underlying(&self) -> Arc<RustCrateService> {
854 self.underlying.clone()
855 }
856}
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployCluster {
860 id: usize,
861 next_port: Rc<RefCell<usize>>,
862 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
863 members: Rc<RefCell<Vec<DeployClusterNode>>>,
864 name_hint: Option<String>,
865}
866
867impl DeployCluster {
868 #[expect(missing_docs, reason = "TODO")]
869 pub fn members(&self) -> Vec<DeployClusterNode> {
870 self.members.borrow().clone()
871 }
872}
873
874impl Node for DeployCluster {
875 type Port = String;
876 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
877 type InstantiateEnv = Deployment;
878
879 fn next_port(&self) -> String {
880 let next_port = *self.next_port.borrow();
881 *self.next_port.borrow_mut() += 1;
882
883 format!("port_{}", next_port)
884 }
885
886 fn instantiate(
887 &self,
888 env: &mut Self::InstantiateEnv,
889 meta: &mut Self::Meta,
890 graph: DfirGraph,
891 extra_stmts: Vec<syn::Stmt>,
892 ) {
893 let has_trybuild = self
894 .cluster_spec
895 .borrow()
896 .as_ref()
897 .unwrap()
898 .iter()
899 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
900
901 let linking_mode = if !cfg!(target_os = "windows")
903 && self
904 .cluster_spec
905 .borrow()
906 .as_ref()
907 .unwrap()
908 .iter()
909 .all(|spec| match spec {
910 CrateOrTrybuild::Crate(_, _) => true, CrateOrTrybuild::Trybuild(t) => {
912 t.host.target_type() == hydro_deploy::HostTargetType::Local
913 }
914 }) {
915 LinkingMode::Dynamic
917 } else {
918 LinkingMode::Static
919 };
920
921 let maybe_trybuild = if has_trybuild {
922 Some(create_graph_trybuild(
923 graph,
924 extra_stmts,
925 &self.name_hint,
926 false,
927 linking_mode,
928 ))
929 } else {
930 None
931 };
932
933 let cluster_nodes = self
934 .cluster_spec
935 .borrow_mut()
936 .take()
937 .unwrap()
938 .into_iter()
939 .map(|spec| {
940 let (service, host) = match spec {
941 CrateOrTrybuild::Crate(c, host) => (c, host),
942 CrateOrTrybuild::Trybuild(trybuild) => {
943 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
944 let host = trybuild.host.clone();
945 (
946 create_trybuild_service(
947 trybuild,
948 &config.project_dir,
949 &config.target_dir,
950 &config.features,
951 bin_name,
952 &config.linking_mode,
953 ),
954 host,
955 )
956 }
957 };
958
959 env.add_service(service, host)
960 })
961 .collect::<Vec<_>>();
962 meta.insert(
963 self.id,
964 (0..(cluster_nodes.len() as u32))
965 .map(TaglessMemberId::from_raw_id)
966 .collect(),
967 );
968 *self.members.borrow_mut() = cluster_nodes
969 .into_iter()
970 .map(|n| DeployClusterNode { underlying: n })
971 .collect();
972 }
973
974 fn update_meta(&self, meta: &Self::Meta) {
975 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
976 node.underlying.update_meta(HydroMeta {
977 clusters: meta.clone(),
978 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
979 });
980 }
981 }
982}
983
984#[expect(missing_docs, reason = "TODO")]
985#[derive(Clone)]
986pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
987
988impl DeployProcessSpec {
989 #[expect(missing_docs, reason = "TODO")]
990 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
991 Self(t, host)
992 }
993}
994
995impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
996 fn build(self, _id: usize, _name_hint: &str) -> DeployNode {
997 DeployNode {
998 next_port: Rc::new(RefCell::new(0)),
999 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1000 underlying: Rc::new(RefCell::new(None)),
1001 }
1002 }
1003}
1004
1005impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1006 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
1007 self.name_hint = Some(format!("{} (process {id})", name_hint));
1008 DeployNode {
1009 next_port: Rc::new(RefCell::new(0)),
1010 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1011 underlying: Rc::new(RefCell::new(None)),
1012 }
1013 }
1014}
1015
1016#[expect(missing_docs, reason = "TODO")]
1017#[derive(Clone)]
1018pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1019
1020impl DeployClusterSpec {
1021 #[expect(missing_docs, reason = "TODO")]
1022 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1023 Self(crates)
1024 }
1025}
1026
1027impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1028 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
1029 DeployCluster {
1030 id,
1031 next_port: Rc::new(RefCell::new(0)),
1032 cluster_spec: Rc::new(RefCell::new(Some(
1033 self.0
1034 .into_iter()
1035 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1036 .collect(),
1037 ))),
1038 members: Rc::new(RefCell::new(vec![])),
1039 name_hint: None,
1040 }
1041 }
1042}
1043
1044impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1045 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1046 let name_hint = format!("{} (cluster {id})", name_hint);
1047 DeployCluster {
1048 id,
1049 next_port: Rc::new(RefCell::new(0)),
1050 cluster_spec: Rc::new(RefCell::new(Some(
1051 self.into_iter()
1052 .enumerate()
1053 .map(|(idx, b)| {
1054 let mut b = b.into();
1055 b.name_hint = Some(name_hint.clone());
1056 b.cluster_idx = Some(idx);
1057 CrateOrTrybuild::Trybuild(b)
1058 })
1059 .collect(),
1060 ))),
1061 members: Rc::new(RefCell::new(vec![])),
1062 name_hint: Some(name_hint),
1063 }
1064 }
1065}
1066
1067fn create_trybuild_service(
1068 trybuild: TrybuildHost,
1069 dir: &std::path::Path,
1070 target_dir: &std::path::PathBuf,
1071 features: &Option<Vec<String>>,
1072 bin_name: &str,
1073 linking_mode: &LinkingMode,
1074) -> RustCrate {
1075 let crate_dir = match linking_mode {
1077 LinkingMode::Dynamic => dir.join("dylib-examples"),
1078 LinkingMode::Static => dir.to_path_buf(),
1079 };
1080
1081 let mut ret = RustCrate::new(&crate_dir)
1082 .target_dir(target_dir)
1083 .example(bin_name)
1084 .no_default_features();
1085
1086 ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1087
1088 if let Some(display_name) = trybuild.display_name {
1089 ret = ret.display_name(display_name);
1090 } else if let Some(name_hint) = trybuild.name_hint {
1091 if let Some(cluster_idx) = trybuild.cluster_idx {
1092 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1093 } else {
1094 ret = ret.display_name(name_hint);
1095 }
1096 }
1097
1098 if let Some(rustflags) = trybuild.rustflags {
1099 ret = ret.rustflags(rustflags);
1100 }
1101
1102 if let Some(profile) = trybuild.profile {
1103 ret = ret.profile(profile);
1104 }
1105
1106 if let Some(tracing) = trybuild.tracing {
1107 ret = ret.tracing(tracing);
1108 }
1109
1110 ret = ret.features(
1111 vec!["hydro___feature_deploy_integration".to_string()]
1112 .into_iter()
1113 .chain(
1114 trybuild
1115 .additional_hydro_features
1116 .into_iter()
1117 .map(|runtime_feature| {
1118 assert!(
1119 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1120 "{runtime_feature} is not a valid Hydro runtime feature"
1121 );
1122 format!("hydro___feature_{runtime_feature}")
1123 }),
1124 )
1125 .chain(trybuild.features),
1126 );
1127
1128 for (key, value) in trybuild.build_envs {
1129 ret = ret.build_env(key, value);
1130 }
1131
1132 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1133 ret = ret.config("build.incremental = false");
1134
1135 if let Some(features) = features {
1136 ret = ret.features(features);
1137 }
1138
1139 ret
1140}