hydro_deploy/
deployment.rs

1#![allow(
2    unexpected_cfgs,
3    reason = "https://github.com/BrynCooke/buildstructor/issues/192"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::gcp::GcpNetwork;
15use super::{
16    CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
17    Service, progress,
18};
19use crate::{AzureHost, ServiceBuilder};
20
21pub struct Deployment {
22    pub hosts: Vec<Weak<dyn Host>>,
23    pub services: Vec<Weak<RwLock<dyn Service>>>,
24    pub resource_pool: ResourcePool,
25    localhost_host: Option<Arc<LocalhostHost>>,
26    last_resource_result: Option<Arc<ResourceResult>>,
27    next_host_id: usize,
28    next_service_id: usize,
29}
30
31impl Default for Deployment {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Deployment {
38    pub fn new() -> Self {
39        let mut ret = Self {
40            hosts: Vec::new(),
41            services: Vec::new(),
42            resource_pool: ResourcePool::default(),
43            localhost_host: None,
44            last_resource_result: None,
45            next_host_id: 0,
46            next_service_id: 0,
47        };
48
49        ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
50        ret
51    }
52
53    #[expect(non_snake_case, reason = "constructor-esque")]
54    pub fn Localhost(&self) -> Arc<LocalhostHost> {
55        self.localhost_host.clone().unwrap()
56    }
57
58    #[expect(non_snake_case, reason = "constructor-esque")]
59    pub fn CustomService(
60        &mut self,
61        on: Arc<dyn Host>,
62        external_ports: Vec<u16>,
63    ) -> Arc<RwLock<CustomService>> {
64        self.add_service(|id| CustomService::new(id, on, external_ports))
65    }
66
67    /// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
68    pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
69        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
70        self.deploy().await?;
71        self.start().await?;
72        trigger.await;
73        self.stop().await?;
74        Ok(())
75    }
76
77    /// Runs `start()`, waits for the trigger future, then runs `stop()`.
78    /// This is useful if you need to initiate external network connections between
79    /// `deploy()` and `start()`.
80    pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
81        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
82        self.start().await?;
83        trigger.await;
84        self.stop().await?;
85        Ok(())
86    }
87
88    /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
89    pub async fn run_ctrl_c(&mut self) -> Result<()> {
90        self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
91    }
92
93    /// Runs `start()`, waits for CTRL+C, then runs `stop()`.
94    /// This is useful if you need to initiate external network connections between
95    /// `deploy()` and `start()`.
96    pub async fn start_ctrl_c(&mut self) -> Result<()> {
97        self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
98    }
99
100    pub async fn deploy(&mut self) -> Result<()> {
101        self.services.retain(|weak| weak.strong_count() > 0);
102
103        progress::ProgressTracker::with_group("deploy", Some(3), || async {
104            let mut resource_batch = super::ResourceBatch::new();
105
106            for service in self.services.iter().filter_map(Weak::upgrade) {
107                service.read().await.collect_resources(&mut resource_batch);
108            }
109
110            for host in self.hosts.iter().filter_map(Weak::upgrade) {
111                host.collect_resources(&mut resource_batch);
112            }
113
114            let resource_result = Arc::new(
115                progress::ProgressTracker::with_group("provision", Some(1), || async {
116                    resource_batch
117                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
118                        .await
119                })
120                .await?,
121            );
122            self.last_resource_result = Some(resource_result.clone());
123
124            for host in self.hosts.iter().filter_map(Weak::upgrade) {
125                host.provision(&resource_result);
126            }
127
128            let upgraded_services = self
129                .services
130                .iter()
131                .filter_map(Weak::upgrade)
132                .collect::<Vec<_>>();
133
134            progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
135                let services_future = upgraded_services
136                    .iter()
137                    .map(|service: &Arc<RwLock<dyn Service>>| {
138                        let resource_result = &resource_result;
139                        async move { service.write().await.deploy(resource_result).await }
140                    })
141                    .collect::<Vec<_>>();
142
143                futures::stream::iter(services_future)
144                    .buffer_unordered(16)
145                    .try_fold((), |_, _| async { Ok(()) })
146            })
147            .await?;
148
149            progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
150                let all_services_ready =
151                    upgraded_services
152                        .iter()
153                        .map(|service: &Arc<RwLock<dyn Service>>| async move {
154                            service.write().await.ready().await?;
155                            Ok(()) as Result<()>
156                        });
157
158                futures::future::try_join_all(all_services_ready)
159            })
160            .await?;
161
162            Ok(())
163        })
164        .await
165    }
166
167    pub async fn start(&mut self) -> Result<()> {
168        self.services.retain(|weak| weak.strong_count() > 0);
169
170        progress::ProgressTracker::with_group("start", None, || {
171            let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
172                |service: Arc<RwLock<dyn Service>>| async move {
173                    service.write().await.start().await?;
174                    Ok(()) as Result<()>
175                },
176            );
177
178            futures::future::try_join_all(all_services_start)
179        })
180        .await?;
181        Ok(())
182    }
183
184    pub async fn stop(&mut self) -> Result<()> {
185        self.services.retain(|weak| weak.strong_count() > 0);
186
187        progress::ProgressTracker::with_group("stop", None, || {
188            let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
189                |service: Arc<RwLock<dyn Service>>| async move {
190                    service.write().await.stop().await?;
191                    Ok(()) as Result<()>
192                },
193            );
194
195            futures::future::try_join_all(all_services_stop)
196        })
197        .await?;
198        Ok(())
199    }
200}
201
202impl Deployment {
203    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
204        let arc = Arc::new(host(self.next_host_id));
205        self.next_host_id += 1;
206
207        self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
208        arc
209    }
210
211    pub fn add_service<T: Service + 'static>(
212        &mut self,
213        service: impl ServiceBuilder<Service = T>,
214    ) -> Arc<RwLock<T>> {
215        let arc = Arc::new(RwLock::new(service.build(self.next_service_id)));
216        self.next_service_id += 1;
217
218        self.services
219            .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
220        arc
221    }
222}
223
224/// Buildstructor methods.
225#[buildstructor::buildstructor]
226impl Deployment {
227    #[builder(entry = "GcpComputeEngineHost", exit = "add")]
228    pub fn add_gcp_compute_engine_host(
229        &mut self,
230        project: String,
231        machine_type: String,
232        image: String,
233        region: String,
234        network: Arc<RwLock<GcpNetwork>>,
235        user: Option<String>,
236    ) -> Arc<GcpComputeEngineHost> {
237        self.add_host(|id| {
238            GcpComputeEngineHost::new(id, project, machine_type, image, region, network, user)
239        })
240    }
241
242    #[builder(entry = "AzureHost", exit = "add")]
243    pub fn add_azure_host(
244        &mut self,
245        project: String,
246        os_type: String, // linux or windows
247        machine_size: String,
248        image: Option<HashMap<String, String>>,
249        region: String,
250        user: Option<String>,
251    ) -> Arc<AzureHost> {
252        self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user))
253    }
254}