1#![expect(
2 mismatched_lifetime_syntaxes,
3 reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::aws::AwsNetwork;
15use super::gcp::GcpNetwork;
16use super::{
17 CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
18 Service, progress,
19};
20use crate::{AwsEc2Host, AzureHost, HostTargetType, ServiceBuilder};
21
22pub struct Deployment {
23 pub hosts: Vec<Weak<dyn Host>>,
24 pub services: Vec<Weak<RwLock<dyn Service>>>,
25 pub resource_pool: ResourcePool,
26 localhost_host: Option<Arc<LocalhostHost>>,
27 last_resource_result: Option<Arc<ResourceResult>>,
28 next_host_id: usize,
29 next_service_id: usize,
30}
31
32impl Default for Deployment {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl Deployment {
39 pub fn new() -> Self {
40 let mut ret = Self {
41 hosts: Vec::new(),
42 services: Vec::new(),
43 resource_pool: ResourcePool::default(),
44 localhost_host: None,
45 last_resource_result: None,
46 next_host_id: 0,
47 next_service_id: 0,
48 };
49
50 ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
51 ret
52 }
53
54 #[expect(non_snake_case, reason = "constructor-esque")]
55 pub fn Localhost(&self) -> Arc<LocalhostHost> {
56 self.localhost_host.clone().unwrap()
57 }
58
59 #[expect(non_snake_case, reason = "constructor-esque")]
60 pub fn CustomService(
61 &mut self,
62 on: Arc<dyn Host>,
63 external_ports: Vec<u16>,
64 ) -> Arc<RwLock<CustomService>> {
65 self.add_service(
66 |id| CustomService::new(id, on.clone(), external_ports),
67 on.clone(),
68 )
69 }
70
71 pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
73 self.deploy().await?;
75 self.start().await?;
76 trigger.await;
77 self.stop().await?;
78 Ok(())
79 }
80
81 pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
85 self.start().await?;
87 trigger.await;
88 self.stop().await?;
89 Ok(())
90 }
91
92 pub async fn run_ctrl_c(&mut self) -> Result<()> {
94 self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
95 }
96
97 pub async fn start_ctrl_c(&mut self) -> Result<()> {
101 self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
102 }
103
104 pub async fn deploy(&mut self) -> Result<()> {
105 self.services.retain(|weak| weak.strong_count() > 0);
106
107 progress::ProgressTracker::with_group("deploy", Some(3), || async {
108 let mut resource_batch = super::ResourceBatch::new();
109
110 for service in self.services.iter().filter_map(Weak::upgrade) {
111 service.read().await.collect_resources(&mut resource_batch);
112 }
113
114 for host in self.hosts.iter().filter_map(Weak::upgrade) {
115 host.collect_resources(&mut resource_batch);
116 }
117
118 let resource_result = Arc::new(
119 progress::ProgressTracker::with_group("provision", Some(1), || async {
120 resource_batch
121 .provision(&mut self.resource_pool, self.last_resource_result.clone())
122 .await
123 })
124 .await?,
125 );
126 self.last_resource_result = Some(resource_result.clone());
127
128 for host in self.hosts.iter().filter_map(Weak::upgrade) {
129 host.provision(&resource_result);
130 }
131
132 let upgraded_services = self
133 .services
134 .iter()
135 .filter_map(Weak::upgrade)
136 .collect::<Vec<_>>();
137
138 progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
139 let services_future = upgraded_services
140 .iter()
141 .map(|service: &Arc<RwLock<dyn Service>>| {
142 let resource_result = &resource_result;
143 async move { service.write().await.deploy(resource_result).await }
144 })
145 .collect::<Vec<_>>();
146
147 futures::stream::iter(services_future)
148 .buffer_unordered(16)
149 .try_fold((), |_, _| async { Ok(()) })
150 })
151 .await?;
152
153 progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
154 let all_services_ready =
155 upgraded_services
156 .iter()
157 .map(|service: &Arc<RwLock<dyn Service>>| async move {
158 service.write().await.ready().await?;
159 Ok(()) as Result<()>
160 });
161
162 futures::future::try_join_all(all_services_ready)
163 })
164 .await?;
165
166 Ok(())
167 })
168 .await
169 }
170
171 pub async fn start(&mut self) -> Result<()> {
172 self.services.retain(|weak| weak.strong_count() > 0);
173
174 progress::ProgressTracker::with_group("start", None, || {
175 let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
176 |service: Arc<RwLock<dyn Service>>| async move {
177 service.write().await.start().await?;
178 Ok(()) as Result<()>
179 },
180 );
181
182 futures::future::try_join_all(all_services_start)
183 })
184 .await?;
185 Ok(())
186 }
187
188 pub async fn stop(&mut self) -> Result<()> {
189 self.services.retain(|weak| weak.strong_count() > 0);
190
191 progress::ProgressTracker::with_group("stop", None, || {
192 let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
193 |service: Arc<RwLock<dyn Service>>| async move {
194 service.write().await.stop().await?;
195 Ok(()) as Result<()>
196 },
197 );
198
199 futures::future::try_join_all(all_services_stop)
200 })
201 .await?;
202 Ok(())
203 }
204}
205
206impl Deployment {
207 pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
208 let arc = Arc::new(host(self.next_host_id));
209 self.next_host_id += 1;
210
211 self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
212 arc
213 }
214
215 pub fn add_service<T: Service + 'static>(
216 &mut self,
217 service: impl ServiceBuilder<Service = T>,
218 on: Arc<dyn Host>,
219 ) -> Arc<RwLock<T>> {
220 let arc = Arc::new(RwLock::new(service.build(self.next_service_id, on)));
221 self.next_service_id += 1;
222
223 self.services
224 .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
225 arc
226 }
227}
228
229#[buildstructor::buildstructor]
231impl Deployment {
232 #[builder(entry = "GcpComputeEngineHost", exit = "add")]
233 pub fn add_gcp_compute_engine_host(
234 &mut self,
235 project: String,
236 machine_type: String,
237 image: String,
238 target_type: Option<HostTargetType>,
239 region: String,
240 network: Arc<RwLock<GcpNetwork>>,
241 user: Option<String>,
242 display_name: Option<String>,
243 ) -> Arc<GcpComputeEngineHost> {
244 self.add_host(|id| {
245 GcpComputeEngineHost::new(
246 id,
247 project,
248 machine_type,
249 image,
250 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
251 region,
252 network,
253 user,
254 display_name,
255 )
256 })
257 }
258
259 #[builder(entry = "AzureHost", exit = "add")]
260 pub fn add_azure_host(
261 &mut self,
262 project: String,
263 os_type: String, machine_size: String,
265 image: Option<HashMap<String, String>>,
266 target_type: Option<HostTargetType>,
267 region: String,
268 user: Option<String>,
269 ) -> Arc<AzureHost> {
270 self.add_host(|id| {
271 AzureHost::new(
272 id,
273 project,
274 os_type,
275 machine_size,
276 image,
277 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
278 region,
279 user,
280 )
281 })
282 }
283
284 #[builder(entry = "AwsEc2Host", exit = "add")]
285 pub fn add_aws_ec2_host(
286 &mut self,
287 region: String,
288 instance_type: String,
289 target_type: Option<HostTargetType>,
290 ami: String,
291 network: Arc<RwLock<AwsNetwork>>,
292 user: Option<String>,
293 display_name: Option<String>,
294 ) -> Arc<AwsEc2Host> {
295 self.add_host(|id| {
296 AwsEc2Host::new(
297 id,
298 region,
299 instance_type,
300 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
301 ami,
302 network,
303 user,
304 display_name,
305 )
306 })
307 }
308}