hydro_deploy/
custom_service.rs
1use std::any::Any;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::{ConnectedDirect, ServerPort};
8use tokio::sync::RwLock;
9
10use crate::rust_crate::ports::{
11 ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
12 SourcePath,
13};
14use crate::{Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service};
15
16pub struct CustomService {
18 _id: usize,
19 on: Arc<dyn Host>,
20
21 external_ports: Vec<u16>,
23
24 launched_host: Option<Arc<dyn LaunchedHost>>,
25}
26
27impl CustomService {
28 pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
29 Self {
30 _id: id,
31 on,
32 external_ports,
33 launched_host: None,
34 }
35 }
36
37 pub fn declare_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
38 CustomClientPort::new(Arc::downgrade(self_arc))
39 }
40}
41
42#[async_trait]
43impl Service for CustomService {
44 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
45 if self.launched_host.is_some() {
46 return;
47 }
48
49 let host = &self.on;
50
51 for port in self.external_ports.iter() {
52 host.request_port(&ServerStrategy::ExternalTcpPort(*port));
53 }
54 }
55
56 async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
57 if self.launched_host.is_some() {
58 return Ok(());
59 }
60
61 let host = &self.on;
62 let launched = host.provision(resource_result);
63 self.launched_host = Some(launched);
64 Ok(())
65 }
66
67 async fn ready(&mut self) -> Result<()> {
68 Ok(())
69 }
70
71 async fn start(&mut self) -> Result<()> {
72 Ok(())
73 }
74
75 async fn stop(&mut self) -> Result<()> {
76 Ok(())
77 }
78}
79
80pub struct CustomClientPort {
81 pub on: Weak<RwLock<CustomService>>,
82 client_port: OnceLock<ServerConfig>,
83}
84
85impl CustomClientPort {
86 pub fn new(on: Weak<RwLock<CustomService>>) -> Self {
87 Self {
88 on,
89 client_port: OnceLock::new(),
90 }
91 }
92
93 pub async fn server_port(&self) -> ServerPort {
94 self.client_port
95 .get()
96 .unwrap()
97 .load_instantiated(&|p| p)
98 .await
99 }
100
101 pub async fn connect(&self) -> ConnectedDirect {
102 self.client_port
103 .get()
104 .unwrap()
105 .load_instantiated(&|p| p)
106 .await
107 .instantiate()
108 .connect::<ConnectedDirect>()
109 .await
110 }
111}
112
113impl RustCrateSource for CustomClientPort {
114 fn source_path(&self) -> SourcePath {
115 SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
116 }
117
118 fn host(&self) -> Arc<dyn Host> {
119 panic!("Custom services cannot be used as the server")
120 }
121
122 fn server(&self) -> Arc<dyn RustCrateServer> {
123 panic!("Custom services cannot be used as the server")
124 }
125
126 fn record_server_config(&self, config: ServerConfig) {
127 self.client_port
128 .set(config)
129 .map_err(drop) .expect("Cannot call `record_server_config()` multiple times.");
131 }
132
133 fn record_server_strategy(&self, _config: ServerStrategy) {
134 panic!("Custom services cannot be used as the server")
135 }
136}
137
138impl RustCrateSink for CustomClientPort {
139 fn as_any(&self) -> &dyn Any {
140 self
141 }
142
143 fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
144 bail!("Custom services cannot be used as the server")
145 }
146
147 fn instantiate_reverse(
148 &self,
149 server_host: &Arc<dyn Host>,
150 server_sink: Arc<dyn RustCrateServer>,
151 wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
152 ) -> Result<ReverseSinkInstantiator> {
153 let client = self.on.upgrade().unwrap();
154 let client_read = client.try_read().unwrap();
155
156 let server_host = server_host.clone();
157
158 let (conn_type, bind_type) = server_host.strategy_as_server(client_read.on.deref())?;
159
160 let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
161
162 Ok(Box::new(move |me| {
163 me.downcast_ref::<CustomClientPort>()
164 .unwrap()
165 .record_server_config(client_port);
166 bind_type(server_host.as_any())
167 }))
168 }
169}