1#![expect(
2 mismatched_lifetime_syntaxes,
3 reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::aws::AwsNetwork;
15use super::gcp::GcpNetwork;
16use super::{
17 CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
18 Service, progress,
19};
20use crate::{AwsEc2Host, AzureHost, ServiceBuilder};
21
22pub struct Deployment {
23 pub hosts: Vec<Weak<dyn Host>>,
24 pub services: Vec<Weak<RwLock<dyn Service>>>,
25 pub resource_pool: ResourcePool,
26 localhost_host: Option<Arc<LocalhostHost>>,
27 last_resource_result: Option<Arc<ResourceResult>>,
28 next_host_id: usize,
29 next_service_id: usize,
30}
31
32impl Default for Deployment {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl Deployment {
39 pub fn new() -> Self {
40 let mut ret = Self {
41 hosts: Vec::new(),
42 services: Vec::new(),
43 resource_pool: ResourcePool::default(),
44 localhost_host: None,
45 last_resource_result: None,
46 next_host_id: 0,
47 next_service_id: 0,
48 };
49
50 ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
51 ret
52 }
53
54 #[expect(non_snake_case, reason = "constructor-esque")]
55 pub fn Localhost(&self) -> Arc<LocalhostHost> {
56 self.localhost_host.clone().unwrap()
57 }
58
59 #[expect(non_snake_case, reason = "constructor-esque")]
60 pub fn CustomService(
61 &mut self,
62 on: Arc<dyn Host>,
63 external_ports: Vec<u16>,
64 ) -> Arc<RwLock<CustomService>> {
65 self.add_service(|id| CustomService::new(id, on, external_ports))
66 }
67
68 pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
70 self.deploy().await?;
72 self.start().await?;
73 trigger.await;
74 self.stop().await?;
75 Ok(())
76 }
77
78 pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
82 self.start().await?;
84 trigger.await;
85 self.stop().await?;
86 Ok(())
87 }
88
89 pub async fn run_ctrl_c(&mut self) -> Result<()> {
91 self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
92 }
93
94 pub async fn start_ctrl_c(&mut self) -> Result<()> {
98 self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
99 }
100
101 pub async fn deploy(&mut self) -> Result<()> {
102 self.services.retain(|weak| weak.strong_count() > 0);
103
104 progress::ProgressTracker::with_group("deploy", Some(3), || async {
105 let mut resource_batch = super::ResourceBatch::new();
106
107 for service in self.services.iter().filter_map(Weak::upgrade) {
108 service.read().await.collect_resources(&mut resource_batch);
109 }
110
111 for host in self.hosts.iter().filter_map(Weak::upgrade) {
112 host.collect_resources(&mut resource_batch);
113 }
114
115 let resource_result = Arc::new(
116 progress::ProgressTracker::with_group("provision", Some(1), || async {
117 resource_batch
118 .provision(&mut self.resource_pool, self.last_resource_result.clone())
119 .await
120 })
121 .await?,
122 );
123 self.last_resource_result = Some(resource_result.clone());
124
125 for host in self.hosts.iter().filter_map(Weak::upgrade) {
126 host.provision(&resource_result);
127 }
128
129 let upgraded_services = self
130 .services
131 .iter()
132 .filter_map(Weak::upgrade)
133 .collect::<Vec<_>>();
134
135 progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
136 let services_future = upgraded_services
137 .iter()
138 .map(|service: &Arc<RwLock<dyn Service>>| {
139 let resource_result = &resource_result;
140 async move { service.write().await.deploy(resource_result).await }
141 })
142 .collect::<Vec<_>>();
143
144 futures::stream::iter(services_future)
145 .buffer_unordered(16)
146 .try_fold((), |_, _| async { Ok(()) })
147 })
148 .await?;
149
150 progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
151 let all_services_ready =
152 upgraded_services
153 .iter()
154 .map(|service: &Arc<RwLock<dyn Service>>| async move {
155 service.write().await.ready().await?;
156 Ok(()) as Result<()>
157 });
158
159 futures::future::try_join_all(all_services_ready)
160 })
161 .await?;
162
163 Ok(())
164 })
165 .await
166 }
167
168 pub async fn start(&mut self) -> Result<()> {
169 self.services.retain(|weak| weak.strong_count() > 0);
170
171 progress::ProgressTracker::with_group("start", None, || {
172 let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
173 |service: Arc<RwLock<dyn Service>>| async move {
174 service.write().await.start().await?;
175 Ok(()) as Result<()>
176 },
177 );
178
179 futures::future::try_join_all(all_services_start)
180 })
181 .await?;
182 Ok(())
183 }
184
185 pub async fn stop(&mut self) -> Result<()> {
186 self.services.retain(|weak| weak.strong_count() > 0);
187
188 progress::ProgressTracker::with_group("stop", None, || {
189 let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
190 |service: Arc<RwLock<dyn Service>>| async move {
191 service.write().await.stop().await?;
192 Ok(()) as Result<()>
193 },
194 );
195
196 futures::future::try_join_all(all_services_stop)
197 })
198 .await?;
199 Ok(())
200 }
201}
202
203impl Deployment {
204 pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
205 let arc = Arc::new(host(self.next_host_id));
206 self.next_host_id += 1;
207
208 self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
209 arc
210 }
211
212 pub fn add_service<T: Service + 'static>(
213 &mut self,
214 service: impl ServiceBuilder<Service = T>,
215 ) -> Arc<RwLock<T>> {
216 let arc = Arc::new(RwLock::new(service.build(self.next_service_id)));
217 self.next_service_id += 1;
218
219 self.services
220 .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
221 arc
222 }
223}
224
225#[buildstructor::buildstructor]
227impl Deployment {
228 #[builder(entry = "GcpComputeEngineHost", exit = "add")]
229 pub fn add_gcp_compute_engine_host(
230 &mut self,
231 project: String,
232 machine_type: String,
233 image: String,
234 region: String,
235 network: Arc<RwLock<GcpNetwork>>,
236 user: Option<String>,
237 display_name: Option<String>,
238 ) -> Arc<GcpComputeEngineHost> {
239 self.add_host(|id| {
240 GcpComputeEngineHost::new(
241 id,
242 project,
243 machine_type,
244 image,
245 region,
246 network,
247 user,
248 display_name,
249 )
250 })
251 }
252
253 #[builder(entry = "AzureHost", exit = "add")]
254 pub fn add_azure_host(
255 &mut self,
256 project: String,
257 os_type: String, machine_size: String,
259 image: Option<HashMap<String, String>>,
260 region: String,
261 user: Option<String>,
262 ) -> Arc<AzureHost> {
263 self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user))
264 }
265
266 #[builder(entry = "AwsEc2Host", exit = "add")]
267 pub fn add_aws_ec2_host(
268 &mut self,
269 region: String,
270 instance_type: String,
271 ami: String,
272 network: Arc<RwLock<AwsNetwork>>,
273 user: Option<String>,
274 display_name: Option<String>,
275 ) -> Arc<AwsEc2Host> {
276 self.add_host(|id| {
277 AwsEc2Host::new(id, region, instance_type, ami, network, user, display_name)
278 })
279 }
280}