hydro_deploy/
deployment.rs

1#![expect(
2    mismatched_lifetime_syntaxes,
3    reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::aws::AwsNetwork;
15use super::gcp::GcpNetwork;
16use super::{
17    CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
18    Service, progress,
19};
20use crate::{AwsEc2Host, AzureHost, ServiceBuilder};
21
22pub struct Deployment {
23    pub hosts: Vec<Weak<dyn Host>>,
24    pub services: Vec<Weak<RwLock<dyn Service>>>,
25    pub resource_pool: ResourcePool,
26    localhost_host: Option<Arc<LocalhostHost>>,
27    last_resource_result: Option<Arc<ResourceResult>>,
28    next_host_id: usize,
29    next_service_id: usize,
30}
31
32impl Default for Deployment {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl Deployment {
39    pub fn new() -> Self {
40        let mut ret = Self {
41            hosts: Vec::new(),
42            services: Vec::new(),
43            resource_pool: ResourcePool::default(),
44            localhost_host: None,
45            last_resource_result: None,
46            next_host_id: 0,
47            next_service_id: 0,
48        };
49
50        ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
51        ret
52    }
53
54    #[expect(non_snake_case, reason = "constructor-esque")]
55    pub fn Localhost(&self) -> Arc<LocalhostHost> {
56        self.localhost_host.clone().unwrap()
57    }
58
59    #[expect(non_snake_case, reason = "constructor-esque")]
60    pub fn CustomService(
61        &mut self,
62        on: Arc<dyn Host>,
63        external_ports: Vec<u16>,
64    ) -> Arc<RwLock<CustomService>> {
65        self.add_service(|id| CustomService::new(id, on, external_ports))
66    }
67
68    /// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
69    pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
70        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
71        self.deploy().await?;
72        self.start().await?;
73        trigger.await;
74        self.stop().await?;
75        Ok(())
76    }
77
78    /// Runs `start()`, waits for the trigger future, then runs `stop()`.
79    /// This is useful if you need to initiate external network connections between
80    /// `deploy()` and `start()`.
81    pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
82        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
83        self.start().await?;
84        trigger.await;
85        self.stop().await?;
86        Ok(())
87    }
88
89    /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
90    pub async fn run_ctrl_c(&mut self) -> Result<()> {
91        self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
92    }
93
94    /// Runs `start()`, waits for CTRL+C, then runs `stop()`.
95    /// This is useful if you need to initiate external network connections between
96    /// `deploy()` and `start()`.
97    pub async fn start_ctrl_c(&mut self) -> Result<()> {
98        self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
99    }
100
101    pub async fn deploy(&mut self) -> Result<()> {
102        self.services.retain(|weak| weak.strong_count() > 0);
103
104        progress::ProgressTracker::with_group("deploy", Some(3), || async {
105            let mut resource_batch = super::ResourceBatch::new();
106
107            for service in self.services.iter().filter_map(Weak::upgrade) {
108                service.read().await.collect_resources(&mut resource_batch);
109            }
110
111            for host in self.hosts.iter().filter_map(Weak::upgrade) {
112                host.collect_resources(&mut resource_batch);
113            }
114
115            let resource_result = Arc::new(
116                progress::ProgressTracker::with_group("provision", Some(1), || async {
117                    resource_batch
118                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
119                        .await
120                })
121                .await?,
122            );
123            self.last_resource_result = Some(resource_result.clone());
124
125            for host in self.hosts.iter().filter_map(Weak::upgrade) {
126                host.provision(&resource_result);
127            }
128
129            let upgraded_services = self
130                .services
131                .iter()
132                .filter_map(Weak::upgrade)
133                .collect::<Vec<_>>();
134
135            progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
136                let services_future = upgraded_services
137                    .iter()
138                    .map(|service: &Arc<RwLock<dyn Service>>| {
139                        let resource_result = &resource_result;
140                        async move { service.write().await.deploy(resource_result).await }
141                    })
142                    .collect::<Vec<_>>();
143
144                futures::stream::iter(services_future)
145                    .buffer_unordered(16)
146                    .try_fold((), |_, _| async { Ok(()) })
147            })
148            .await?;
149
150            progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
151                let all_services_ready =
152                    upgraded_services
153                        .iter()
154                        .map(|service: &Arc<RwLock<dyn Service>>| async move {
155                            service.write().await.ready().await?;
156                            Ok(()) as Result<()>
157                        });
158
159                futures::future::try_join_all(all_services_ready)
160            })
161            .await?;
162
163            Ok(())
164        })
165        .await
166    }
167
168    pub async fn start(&mut self) -> Result<()> {
169        self.services.retain(|weak| weak.strong_count() > 0);
170
171        progress::ProgressTracker::with_group("start", None, || {
172            let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
173                |service: Arc<RwLock<dyn Service>>| async move {
174                    service.write().await.start().await?;
175                    Ok(()) as Result<()>
176                },
177            );
178
179            futures::future::try_join_all(all_services_start)
180        })
181        .await?;
182        Ok(())
183    }
184
185    pub async fn stop(&mut self) -> Result<()> {
186        self.services.retain(|weak| weak.strong_count() > 0);
187
188        progress::ProgressTracker::with_group("stop", None, || {
189            let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
190                |service: Arc<RwLock<dyn Service>>| async move {
191                    service.write().await.stop().await?;
192                    Ok(()) as Result<()>
193                },
194            );
195
196            futures::future::try_join_all(all_services_stop)
197        })
198        .await?;
199        Ok(())
200    }
201}
202
203impl Deployment {
204    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
205        let arc = Arc::new(host(self.next_host_id));
206        self.next_host_id += 1;
207
208        self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
209        arc
210    }
211
212    pub fn add_service<T: Service + 'static>(
213        &mut self,
214        service: impl ServiceBuilder<Service = T>,
215    ) -> Arc<RwLock<T>> {
216        let arc = Arc::new(RwLock::new(service.build(self.next_service_id)));
217        self.next_service_id += 1;
218
219        self.services
220            .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
221        arc
222    }
223}
224
225/// Buildstructor methods.
226#[buildstructor::buildstructor]
227impl Deployment {
228    #[builder(entry = "GcpComputeEngineHost", exit = "add")]
229    pub fn add_gcp_compute_engine_host(
230        &mut self,
231        project: String,
232        machine_type: String,
233        image: String,
234        region: String,
235        network: Arc<RwLock<GcpNetwork>>,
236        user: Option<String>,
237        display_name: Option<String>,
238    ) -> Arc<GcpComputeEngineHost> {
239        self.add_host(|id| {
240            GcpComputeEngineHost::new(
241                id,
242                project,
243                machine_type,
244                image,
245                region,
246                network,
247                user,
248                display_name,
249            )
250        })
251    }
252
253    #[builder(entry = "AzureHost", exit = "add")]
254    pub fn add_azure_host(
255        &mut self,
256        project: String,
257        os_type: String, // linux or windows
258        machine_size: String,
259        image: Option<HashMap<String, String>>,
260        region: String,
261        user: Option<String>,
262    ) -> Arc<AzureHost> {
263        self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user))
264    }
265
266    #[builder(entry = "AwsEc2Host", exit = "add")]
267    pub fn add_aws_ec2_host(
268        &mut self,
269        region: String,
270        instance_type: String,
271        ami: String,
272        network: Arc<RwLock<AwsNetwork>>,
273        user: Option<String>,
274        display_name: Option<String>,
275    ) -> Arc<AwsEc2Host> {
276        self.add_host(|id| {
277            AwsEc2Host::new(id, region, instance_type, ami, network, user, display_name)
278        })
279    }
280}