hydro_deploy/
custom_service.rs1use std::any::TypeId;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::ConnectedDirect;
8pub use hydro_deploy_integration::ServerPort;
9use tokio::sync::RwLock;
10
11use crate::rust_crate::ports::{
12 ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
13 SourcePath,
14};
15use crate::{
16 BaseServerStrategy, Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service,
17};
18
19pub struct CustomService {
21 _id: usize,
22 on: Arc<dyn Host>,
23
24 external_ports: Vec<u16>,
26
27 launched_host: Option<Arc<dyn LaunchedHost>>,
28}
29
30impl CustomService {
31 pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
32 Self {
33 _id: id,
34 on,
35 external_ports,
36 launched_host: None,
37 }
38 }
39
40 pub fn declare_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
41 CustomClientPort::new(Arc::downgrade(self_arc), false)
42 }
43
44 pub fn declare_many_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
45 CustomClientPort::new(Arc::downgrade(self_arc), true)
46 }
47}
48
49#[async_trait]
50impl Service for CustomService {
51 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
52 if self.launched_host.is_some() {
53 return;
54 }
55
56 let host = &self.on;
57
58 for port in self.external_ports.iter() {
59 host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
60 }
61 }
62
63 async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
64 if self.launched_host.is_some() {
65 return Ok(());
66 }
67
68 let host = &self.on;
69 let launched = host.provision(resource_result);
70 self.launched_host = Some(launched);
71 Ok(())
72 }
73
74 async fn ready(&mut self) -> Result<()> {
75 Ok(())
76 }
77
78 async fn start(&mut self) -> Result<()> {
79 Ok(())
80 }
81
82 async fn stop(&mut self) -> Result<()> {
83 Ok(())
84 }
85}
86
87#[derive(Clone)]
88pub struct CustomClientPort {
89 pub on: Weak<RwLock<CustomService>>,
90 many: bool,
91 client_port: OnceLock<ServerConfig>,
92}
93
94impl CustomClientPort {
95 fn new(on: Weak<RwLock<CustomService>>, many: bool) -> Self {
96 Self {
97 on,
98 many,
99 client_port: OnceLock::new(),
100 }
101 }
102
103 pub async fn server_port(&self) -> ServerPort {
104 self.client_port
105 .get()
106 .unwrap()
107 .load_instantiated(&|p| p)
108 .await
109 }
110
111 pub async fn connect(&self) -> ConnectedDirect {
112 self.client_port
113 .get()
114 .unwrap()
115 .load_instantiated(&|p| p)
116 .await
117 .instantiate()
118 .await
119 .connect::<ConnectedDirect>()
120 }
121}
122
123impl RustCrateSource for CustomClientPort {
124 fn source_path(&self) -> SourcePath {
125 if self.many {
126 SourcePath::Many(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
127 } else {
128 SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
129 }
130 }
131
132 fn host(&self) -> Arc<dyn Host> {
133 panic!("Custom services cannot be used as the server")
134 }
135
136 fn server(&self) -> Arc<dyn RustCrateServer> {
137 panic!("Custom services cannot be used as the server")
138 }
139
140 fn record_server_config(&self, config: ServerConfig) {
141 self.client_port
142 .set(config)
143 .map_err(drop) .expect("Cannot call `record_server_config()` multiple times.");
145 }
146
147 fn record_server_strategy(&self, _config: ServerStrategy) {
148 panic!("Custom services cannot be used as the server")
149 }
150}
151
152impl RustCrateSink for CustomClientPort {
153 fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
154 bail!("Custom services cannot be used as the server")
155 }
156
157 fn instantiate_reverse(
158 &self,
159 server_host: &Arc<dyn Host>,
160 server_sink: Arc<dyn RustCrateServer>,
161 wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
162 ) -> Result<ReverseSinkInstantiator> {
163 let client = self.on.upgrade().unwrap();
164 let client_read = client.try_read().unwrap();
165
166 let server_host = server_host.clone();
167
168 let (conn_type, bind_type) =
169 server_host.strategy_as_server(client_read.on.deref(), crate::PortNetworkHint::Auto)?;
170
171 let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
172
173 Ok(Box::new(move |me| {
174 assert_eq!(
175 me.type_id(),
176 TypeId::of::<CustomClientPort>(),
177 "`instantiate_reverse` received different type than expected."
178 );
179 me.downcast_ref::<CustomClientPort>()
180 .unwrap()
181 .record_server_config(client_port);
182 ServerStrategy::Direct(bind_type(&*server_host))
183 }))
184 }
185}