hydro_deploy/
deployment.rs

1#![expect(
2    mismatched_lifetime_syntaxes,
3    reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::aws::AwsNetwork;
15use super::gcp::GcpNetwork;
16use super::{
17    CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
18    Service, progress,
19};
20use crate::{AwsEc2Host, AzureHost, HostTargetType, ServiceBuilder};
21
22pub struct Deployment {
23    pub hosts: Vec<Weak<dyn Host>>,
24    pub services: Vec<Weak<RwLock<dyn Service>>>,
25    pub resource_pool: ResourcePool,
26    localhost_host: Option<Arc<LocalhostHost>>,
27    last_resource_result: Option<Arc<ResourceResult>>,
28    next_host_id: usize,
29    next_service_id: usize,
30}
31
32impl Default for Deployment {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl Deployment {
39    pub fn new() -> Self {
40        let mut ret = Self {
41            hosts: Vec::new(),
42            services: Vec::new(),
43            resource_pool: ResourcePool::default(),
44            localhost_host: None,
45            last_resource_result: None,
46            next_host_id: 0,
47            next_service_id: 0,
48        };
49
50        ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
51        ret
52    }
53
54    #[expect(non_snake_case, reason = "constructor-esque")]
55    pub fn Localhost(&self) -> Arc<LocalhostHost> {
56        self.localhost_host.clone().unwrap()
57    }
58
59    #[expect(non_snake_case, reason = "constructor-esque")]
60    pub fn CustomService(
61        &mut self,
62        on: Arc<dyn Host>,
63        external_ports: Vec<u16>,
64    ) -> Arc<RwLock<CustomService>> {
65        self.add_service(
66            |id| CustomService::new(id, on.clone(), external_ports),
67            on.clone(),
68        )
69    }
70
71    /// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
72    pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
73        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
74        self.deploy().await?;
75        self.start().await?;
76        trigger.await;
77        self.stop().await?;
78        Ok(())
79    }
80
81    /// Runs `start()`, waits for the trigger future, then runs `stop()`.
82    /// This is useful if you need to initiate external network connections between
83    /// `deploy()` and `start()`.
84    pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
85        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
86        self.start().await?;
87        trigger.await;
88        self.stop().await?;
89        Ok(())
90    }
91
92    /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
93    pub async fn run_ctrl_c(&mut self) -> Result<()> {
94        self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
95    }
96
97    /// Runs `start()`, waits for CTRL+C, then runs `stop()`.
98    /// This is useful if you need to initiate external network connections between
99    /// `deploy()` and `start()`.
100    pub async fn start_ctrl_c(&mut self) -> Result<()> {
101        self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
102    }
103
104    pub async fn deploy(&mut self) -> Result<()> {
105        self.services.retain(|weak| weak.strong_count() > 0);
106
107        progress::ProgressTracker::with_group("deploy", Some(3), || async {
108            let mut resource_batch = super::ResourceBatch::new();
109
110            for service in self.services.iter().filter_map(Weak::upgrade) {
111                service.read().await.collect_resources(&mut resource_batch);
112            }
113
114            for host in self.hosts.iter().filter_map(Weak::upgrade) {
115                host.collect_resources(&mut resource_batch);
116            }
117
118            let resource_result = Arc::new(
119                progress::ProgressTracker::with_group("provision", Some(1), || async {
120                    resource_batch
121                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
122                        .await
123                })
124                .await?,
125            );
126            self.last_resource_result = Some(resource_result.clone());
127
128            for host in self.hosts.iter().filter_map(Weak::upgrade) {
129                host.provision(&resource_result);
130            }
131
132            let upgraded_services = self
133                .services
134                .iter()
135                .filter_map(Weak::upgrade)
136                .collect::<Vec<_>>();
137
138            progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
139                let services_future = upgraded_services
140                    .iter()
141                    .map(|service: &Arc<RwLock<dyn Service>>| {
142                        let resource_result = &resource_result;
143                        async move { service.write().await.deploy(resource_result).await }
144                    })
145                    .collect::<Vec<_>>();
146
147                futures::stream::iter(services_future)
148                    .buffer_unordered(16)
149                    .try_fold((), |_, _| async { Ok(()) })
150            })
151            .await?;
152
153            progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
154                let all_services_ready =
155                    upgraded_services
156                        .iter()
157                        .map(|service: &Arc<RwLock<dyn Service>>| async move {
158                            service.write().await.ready().await?;
159                            Ok(()) as Result<()>
160                        });
161
162                futures::future::try_join_all(all_services_ready)
163            })
164            .await?;
165
166            Ok(())
167        })
168        .await
169    }
170
171    pub async fn start(&mut self) -> Result<()> {
172        self.services.retain(|weak| weak.strong_count() > 0);
173
174        progress::ProgressTracker::with_group("start", None, || {
175            let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
176                |service: Arc<RwLock<dyn Service>>| async move {
177                    service.write().await.start().await?;
178                    Ok(()) as Result<()>
179                },
180            );
181
182            futures::future::try_join_all(all_services_start)
183        })
184        .await?;
185        Ok(())
186    }
187
188    pub async fn stop(&mut self) -> Result<()> {
189        self.services.retain(|weak| weak.strong_count() > 0);
190
191        progress::ProgressTracker::with_group("stop", None, || {
192            let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
193                |service: Arc<RwLock<dyn Service>>| async move {
194                    service.write().await.stop().await?;
195                    Ok(()) as Result<()>
196                },
197            );
198
199            futures::future::try_join_all(all_services_stop)
200        })
201        .await?;
202        Ok(())
203    }
204}
205
206impl Deployment {
207    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
208        let arc = Arc::new(host(self.next_host_id));
209        self.next_host_id += 1;
210
211        self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
212        arc
213    }
214
215    pub fn add_service<T: Service + 'static>(
216        &mut self,
217        service: impl ServiceBuilder<Service = T>,
218        on: Arc<dyn Host>,
219    ) -> Arc<RwLock<T>> {
220        let arc = Arc::new(RwLock::new(service.build(self.next_service_id, on)));
221        self.next_service_id += 1;
222
223        self.services
224            .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
225        arc
226    }
227}
228
229/// Buildstructor methods.
230#[buildstructor::buildstructor]
231impl Deployment {
232    #[builder(entry = "GcpComputeEngineHost", exit = "add")]
233    pub fn add_gcp_compute_engine_host(
234        &mut self,
235        project: String,
236        machine_type: String,
237        image: String,
238        target_type: Option<HostTargetType>,
239        region: String,
240        network: Arc<RwLock<GcpNetwork>>,
241        user: Option<String>,
242        display_name: Option<String>,
243    ) -> Arc<GcpComputeEngineHost> {
244        self.add_host(|id| {
245            GcpComputeEngineHost::new(
246                id,
247                project,
248                machine_type,
249                image,
250                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
251                region,
252                network,
253                user,
254                display_name,
255            )
256        })
257    }
258
259    #[builder(entry = "AzureHost", exit = "add")]
260    pub fn add_azure_host(
261        &mut self,
262        project: String,
263        os_type: String, // linux or windows
264        machine_size: String,
265        image: Option<HashMap<String, String>>,
266        target_type: Option<HostTargetType>,
267        region: String,
268        user: Option<String>,
269    ) -> Arc<AzureHost> {
270        self.add_host(|id| {
271            AzureHost::new(
272                id,
273                project,
274                os_type,
275                machine_size,
276                image,
277                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
278                region,
279                user,
280            )
281        })
282    }
283
284    #[builder(entry = "AwsEc2Host", exit = "add")]
285    pub fn add_aws_ec2_host(
286        &mut self,
287        region: String,
288        instance_type: String,
289        target_type: Option<HostTargetType>,
290        ami: String,
291        network: Arc<RwLock<AwsNetwork>>,
292        user: Option<String>,
293        display_name: Option<String>,
294    ) -> Arc<AwsEc2Host> {
295        self.add_host(|id| {
296            AwsEc2Host::new(
297                id,
298                region,
299                instance_type,
300                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
301                ami,
302                network,
303                user,
304                display_name,
305            )
306        })
307    }
308}