hydro_deploy/
custom_service.rs

1use std::any::TypeId;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::ConnectedDirect;
8pub use hydro_deploy_integration::ServerPort;
9use tokio::sync::RwLock;
10
11use crate::rust_crate::ports::{
12    ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
13    SourcePath,
14};
15use crate::{
16    BaseServerStrategy, Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service,
17};
18
19/// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem.
20pub struct CustomService {
21    _id: usize,
22    on: Arc<dyn Host>,
23
24    /// The ports that the service wishes to expose to the public internet.
25    external_ports: Vec<u16>,
26
27    launched_host: Option<Arc<dyn LaunchedHost>>,
28}
29
30impl CustomService {
31    pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
32        Self {
33            _id: id,
34            on,
35            external_ports,
36            launched_host: None,
37        }
38    }
39
40    pub fn declare_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
41        CustomClientPort::new(Arc::downgrade(self_arc), false)
42    }
43
44    pub fn declare_many_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
45        CustomClientPort::new(Arc::downgrade(self_arc), true)
46    }
47}
48
49#[async_trait]
50impl Service for CustomService {
51    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
52        if self.launched_host.is_some() {
53            return;
54        }
55
56        let host = &self.on;
57
58        for port in self.external_ports.iter() {
59            host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
60        }
61    }
62
63    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
64        if self.launched_host.is_some() {
65            return Ok(());
66        }
67
68        let host = &self.on;
69        let launched = host.provision(resource_result);
70        self.launched_host = Some(launched);
71        Ok(())
72    }
73
74    async fn ready(&mut self) -> Result<()> {
75        Ok(())
76    }
77
78    async fn start(&mut self) -> Result<()> {
79        Ok(())
80    }
81
82    async fn stop(&mut self) -> Result<()> {
83        Ok(())
84    }
85}
86
87#[derive(Clone)]
88pub struct CustomClientPort {
89    pub on: Weak<RwLock<CustomService>>,
90    many: bool,
91    client_port: OnceLock<ServerConfig>,
92}
93
94impl CustomClientPort {
95    fn new(on: Weak<RwLock<CustomService>>, many: bool) -> Self {
96        Self {
97            on,
98            many,
99            client_port: OnceLock::new(),
100        }
101    }
102
103    pub async fn server_port(&self) -> ServerPort {
104        self.client_port
105            .get()
106            .unwrap()
107            .load_instantiated(&|p| p)
108            .await
109    }
110
111    pub async fn connect(&self) -> ConnectedDirect {
112        self.client_port
113            .get()
114            .unwrap()
115            .load_instantiated(&|p| p)
116            .await
117            .instantiate()
118            .await
119            .connect::<ConnectedDirect>()
120    }
121}
122
123impl RustCrateSource for CustomClientPort {
124    fn source_path(&self) -> SourcePath {
125        if self.many {
126            SourcePath::Many(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
127        } else {
128            SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
129        }
130    }
131
132    fn host(&self) -> Arc<dyn Host> {
133        panic!("Custom services cannot be used as the server")
134    }
135
136    fn server(&self) -> Arc<dyn RustCrateServer> {
137        panic!("Custom services cannot be used as the server")
138    }
139
140    fn record_server_config(&self, config: ServerConfig) {
141        self.client_port
142            .set(config)
143            .map_err(drop) // `ServerConfig` doesn't implement `Debug` for `.expect()`.
144            .expect("Cannot call `record_server_config()` multiple times.");
145    }
146
147    fn record_server_strategy(&self, _config: ServerStrategy) {
148        panic!("Custom services cannot be used as the server")
149    }
150}
151
152impl RustCrateSink for CustomClientPort {
153    fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
154        bail!("Custom services cannot be used as the server")
155    }
156
157    fn instantiate_reverse(
158        &self,
159        server_host: &Arc<dyn Host>,
160        server_sink: Arc<dyn RustCrateServer>,
161        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
162    ) -> Result<ReverseSinkInstantiator> {
163        let client = self.on.upgrade().unwrap();
164        let client_read = client.try_read().unwrap();
165
166        let server_host = server_host.clone();
167
168        let (conn_type, bind_type) =
169            server_host.strategy_as_server(client_read.on.deref(), crate::PortNetworkHint::Auto)?;
170
171        let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
172
173        Ok(Box::new(move |me| {
174            assert_eq!(
175                me.type_id(),
176                TypeId::of::<CustomClientPort>(),
177                "`instantiate_reverse` received different type than expected."
178            );
179            me.downcast_ref::<CustomClientPort>()
180                .unwrap()
181                .record_server_config(client_port);
182            ServerStrategy::Direct(bind_type(&*server_host))
183        }))
184    }
185}