hydro_deploy/
custom_service.rs

1use std::any::Any;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::{ConnectedDirect, ServerPort};
8use tokio::sync::RwLock;
9
10use crate::rust_crate::ports::{
11    ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
12    SourcePath,
13};
14use crate::{Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service};
15
16/// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem.
17pub struct CustomService {
18    _id: usize,
19    on: Arc<dyn Host>,
20
21    /// The ports that the service wishes to expose to the public internet.
22    external_ports: Vec<u16>,
23
24    launched_host: Option<Arc<dyn LaunchedHost>>,
25}
26
27impl CustomService {
28    pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
29        Self {
30            _id: id,
31            on,
32            external_ports,
33            launched_host: None,
34        }
35    }
36
37    pub fn declare_client(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
38        CustomClientPort::new(Arc::downgrade(self_arc))
39    }
40}
41
42#[async_trait]
43impl Service for CustomService {
44    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
45        if self.launched_host.is_some() {
46            return;
47        }
48
49        let host = &self.on;
50
51        for port in self.external_ports.iter() {
52            host.request_port(&ServerStrategy::ExternalTcpPort(*port));
53        }
54    }
55
56    async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
57        if self.launched_host.is_some() {
58            return Ok(());
59        }
60
61        let host = &self.on;
62        let launched = host.provision(resource_result);
63        self.launched_host = Some(launched);
64        Ok(())
65    }
66
67    async fn ready(&mut self) -> Result<()> {
68        Ok(())
69    }
70
71    async fn start(&mut self) -> Result<()> {
72        Ok(())
73    }
74
75    async fn stop(&mut self) -> Result<()> {
76        Ok(())
77    }
78}
79
80pub struct CustomClientPort {
81    pub on: Weak<RwLock<CustomService>>,
82    client_port: OnceLock<ServerConfig>,
83}
84
85impl CustomClientPort {
86    pub fn new(on: Weak<RwLock<CustomService>>) -> Self {
87        Self {
88            on,
89            client_port: OnceLock::new(),
90        }
91    }
92
93    pub async fn server_port(&self) -> ServerPort {
94        self.client_port
95            .get()
96            .unwrap()
97            .load_instantiated(&|p| p)
98            .await
99    }
100
101    pub async fn connect(&self) -> ConnectedDirect {
102        self.client_port
103            .get()
104            .unwrap()
105            .load_instantiated(&|p| p)
106            .await
107            .instantiate()
108            .connect::<ConnectedDirect>()
109            .await
110    }
111}
112
113impl RustCrateSource for CustomClientPort {
114    fn source_path(&self) -> SourcePath {
115        SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
116    }
117
118    fn host(&self) -> Arc<dyn Host> {
119        panic!("Custom services cannot be used as the server")
120    }
121
122    fn server(&self) -> Arc<dyn RustCrateServer> {
123        panic!("Custom services cannot be used as the server")
124    }
125
126    fn record_server_config(&self, config: ServerConfig) {
127        self.client_port
128            .set(config)
129            .map_err(drop) // `ServerConfig` doesn't implement `Debug` for `.expect()`.
130            .expect("Cannot call `record_server_config()` multiple times.");
131    }
132
133    fn record_server_strategy(&self, _config: ServerStrategy) {
134        panic!("Custom services cannot be used as the server")
135    }
136}
137
138impl RustCrateSink for CustomClientPort {
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142
143    fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
144        bail!("Custom services cannot be used as the server")
145    }
146
147    fn instantiate_reverse(
148        &self,
149        server_host: &Arc<dyn Host>,
150        server_sink: Arc<dyn RustCrateServer>,
151        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
152    ) -> Result<ReverseSinkInstantiator> {
153        let client = self.on.upgrade().unwrap();
154        let client_read = client.try_read().unwrap();
155
156        let server_host = server_host.clone();
157
158        let (conn_type, bind_type) = server_host.strategy_as_server(client_read.on.deref())?;
159
160        let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
161
162        Ok(Box::new(move |me| {
163            me.downcast_ref::<CustomClientPort>()
164                .unwrap()
165                .record_server_config(client_port);
166            bind_type(server_host.as_any())
167        }))
168    }
169}