1#![allow(
2 unexpected_cfgs,
3 reason = "https://github.com/BrynCooke/buildstructor/issues/192"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::gcp::GcpNetwork;
15use super::{
16 CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
17 Service, progress,
18};
19use crate::{AzureHost, ServiceBuilder};
20
21pub struct Deployment {
22 pub hosts: Vec<Weak<dyn Host>>,
23 pub services: Vec<Weak<RwLock<dyn Service>>>,
24 pub resource_pool: ResourcePool,
25 localhost_host: Option<Arc<LocalhostHost>>,
26 last_resource_result: Option<Arc<ResourceResult>>,
27 next_host_id: usize,
28 next_service_id: usize,
29}
30
31impl Default for Deployment {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Deployment {
38 pub fn new() -> Self {
39 let mut ret = Self {
40 hosts: Vec::new(),
41 services: Vec::new(),
42 resource_pool: ResourcePool::default(),
43 localhost_host: None,
44 last_resource_result: None,
45 next_host_id: 0,
46 next_service_id: 0,
47 };
48
49 ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
50 ret
51 }
52
53 #[expect(non_snake_case, reason = "constructor-esque")]
54 pub fn Localhost(&self) -> Arc<LocalhostHost> {
55 self.localhost_host.clone().unwrap()
56 }
57
58 #[expect(non_snake_case, reason = "constructor-esque")]
59 pub fn CustomService(
60 &mut self,
61 on: Arc<dyn Host>,
62 external_ports: Vec<u16>,
63 ) -> Arc<RwLock<CustomService>> {
64 self.add_service(|id| CustomService::new(id, on, external_ports))
65 }
66
67 pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
69 self.deploy().await?;
71 self.start().await?;
72 trigger.await;
73 self.stop().await?;
74 Ok(())
75 }
76
77 pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
81 self.start().await?;
83 trigger.await;
84 self.stop().await?;
85 Ok(())
86 }
87
88 pub async fn run_ctrl_c(&mut self) -> Result<()> {
90 self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
91 }
92
93 pub async fn start_ctrl_c(&mut self) -> Result<()> {
97 self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
98 }
99
100 pub async fn deploy(&mut self) -> Result<()> {
101 self.services.retain(|weak| weak.strong_count() > 0);
102
103 progress::ProgressTracker::with_group("deploy", Some(3), || async {
104 let mut resource_batch = super::ResourceBatch::new();
105
106 for service in self.services.iter().filter_map(Weak::upgrade) {
107 service.read().await.collect_resources(&mut resource_batch);
108 }
109
110 for host in self.hosts.iter().filter_map(Weak::upgrade) {
111 host.collect_resources(&mut resource_batch);
112 }
113
114 let resource_result = Arc::new(
115 progress::ProgressTracker::with_group("provision", Some(1), || async {
116 resource_batch
117 .provision(&mut self.resource_pool, self.last_resource_result.clone())
118 .await
119 })
120 .await?,
121 );
122 self.last_resource_result = Some(resource_result.clone());
123
124 for host in self.hosts.iter().filter_map(Weak::upgrade) {
125 host.provision(&resource_result);
126 }
127
128 let upgraded_services = self
129 .services
130 .iter()
131 .filter_map(Weak::upgrade)
132 .collect::<Vec<_>>();
133
134 progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
135 let services_future = upgraded_services
136 .iter()
137 .map(|service: &Arc<RwLock<dyn Service>>| {
138 let resource_result = &resource_result;
139 async move { service.write().await.deploy(resource_result).await }
140 })
141 .collect::<Vec<_>>();
142
143 futures::stream::iter(services_future)
144 .buffer_unordered(16)
145 .try_fold((), |_, _| async { Ok(()) })
146 })
147 .await?;
148
149 progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
150 let all_services_ready =
151 upgraded_services
152 .iter()
153 .map(|service: &Arc<RwLock<dyn Service>>| async move {
154 service.write().await.ready().await?;
155 Ok(()) as Result<()>
156 });
157
158 futures::future::try_join_all(all_services_ready)
159 })
160 .await?;
161
162 Ok(())
163 })
164 .await
165 }
166
167 pub async fn start(&mut self) -> Result<()> {
168 self.services.retain(|weak| weak.strong_count() > 0);
169
170 progress::ProgressTracker::with_group("start", None, || {
171 let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
172 |service: Arc<RwLock<dyn Service>>| async move {
173 service.write().await.start().await?;
174 Ok(()) as Result<()>
175 },
176 );
177
178 futures::future::try_join_all(all_services_start)
179 })
180 .await?;
181 Ok(())
182 }
183
184 pub async fn stop(&mut self) -> Result<()> {
185 self.services.retain(|weak| weak.strong_count() > 0);
186
187 progress::ProgressTracker::with_group("stop", None, || {
188 let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
189 |service: Arc<RwLock<dyn Service>>| async move {
190 service.write().await.stop().await?;
191 Ok(()) as Result<()>
192 },
193 );
194
195 futures::future::try_join_all(all_services_stop)
196 })
197 .await?;
198 Ok(())
199 }
200}
201
202impl Deployment {
203 pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
204 let arc = Arc::new(host(self.next_host_id));
205 self.next_host_id += 1;
206
207 self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
208 arc
209 }
210
211 pub fn add_service<T: Service + 'static>(
212 &mut self,
213 service: impl ServiceBuilder<Service = T>,
214 ) -> Arc<RwLock<T>> {
215 let arc = Arc::new(RwLock::new(service.build(self.next_service_id)));
216 self.next_service_id += 1;
217
218 self.services
219 .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
220 arc
221 }
222}
223
224#[buildstructor::buildstructor]
226impl Deployment {
227 #[builder(entry = "GcpComputeEngineHost", exit = "add")]
228 pub fn add_gcp_compute_engine_host(
229 &mut self,
230 project: String,
231 machine_type: String,
232 image: String,
233 region: String,
234 network: Arc<RwLock<GcpNetwork>>,
235 user: Option<String>,
236 ) -> Arc<GcpComputeEngineHost> {
237 self.add_host(|id| {
238 GcpComputeEngineHost::new(id, project, machine_type, image, region, network, user)
239 })
240 }
241
242 #[builder(entry = "AzureHost", exit = "add")]
243 pub fn add_azure_host(
244 &mut self,
245 project: String,
246 os_type: String, machine_size: String,
248 image: Option<HashMap<String, String>>,
249 region: String,
250 user: Option<String>,
251 ) -> Arc<AzureHost> {
252 self.add_host(|id| AzureHost::new(id, project, os_type, machine_size, image, region, user))
253 }
254}