hydro_deploy/
terraform.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader};
3#[cfg(unix)]
4use std::os::unix::process::CommandExt;
5use std::process::{Child, ChildStdout, Command};
6use std::sync::{Arc, RwLock};
7
8use anyhow::{Context, Result, bail};
9use async_process::Stdio;
10use serde::{Deserialize, Serialize};
11use tempfile::TempDir;
12
13use super::progress::ProgressTracker;
14
15pub static TERRAFORM_ALPHABET: [char; 16] = [
16    '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f',
17];
18
19/// Keeps track of resources which may need to be cleaned up.
20#[derive(Default)]
21pub struct TerraformPool {
22    counter: u32,
23    active_applies: HashMap<u32, Arc<tokio::sync::RwLock<TerraformApply>>>,
24}
25
26impl TerraformPool {
27    fn create_apply(
28        &mut self,
29        deployment_folder: TempDir,
30    ) -> Result<(u32, Arc<tokio::sync::RwLock<TerraformApply>>)> {
31        let next_counter = self.counter;
32        self.counter += 1;
33
34        let mut apply_command = Command::new("terraform");
35
36        apply_command
37            .current_dir(deployment_folder.path())
38            .arg("apply")
39            .arg("-auto-approve")
40            .arg("-no-color")
41            .arg("-parallelism=128");
42
43        #[cfg(unix)]
44        {
45            apply_command.process_group(0);
46        }
47
48        let spawned_child = apply_command
49            .stdout(Stdio::piped())
50            .stderr(Stdio::piped())
51            .spawn()
52            .context("Failed to spawn `terraform`. Is it installed?")?;
53
54        let spawned_id = spawned_child.id();
55
56        let deployment = Arc::new(tokio::sync::RwLock::new(TerraformApply {
57            child: Some((spawned_id, Arc::new(RwLock::new(spawned_child)))),
58            deployment_folder: Some(deployment_folder),
59        }));
60
61        self.active_applies.insert(next_counter, deployment.clone());
62
63        Ok((next_counter, deployment))
64    }
65
66    fn drop_apply(&mut self, counter: u32) {
67        self.active_applies.remove(&counter);
68    }
69}
70
71impl Drop for TerraformPool {
72    fn drop(&mut self) {
73        for (_, apply) in self.active_applies.drain() {
74            debug_assert_eq!(Arc::strong_count(&apply), 1);
75        }
76    }
77}
78
79#[derive(Serialize, Deserialize)]
80pub struct TerraformBatch {
81    pub terraform: TerraformConfig,
82    #[serde(skip_serializing_if = "HashMap::is_empty")]
83    pub provider: HashMap<String, serde_json::Value>,
84    #[serde(skip_serializing_if = "HashMap::is_empty")]
85    pub data: HashMap<String, HashMap<String, serde_json::Value>>,
86    pub resource: HashMap<String, HashMap<String, serde_json::Value>>,
87    pub output: HashMap<String, TerraformOutput>,
88}
89
90impl Default for TerraformBatch {
91    fn default() -> TerraformBatch {
92        TerraformBatch {
93            terraform: TerraformConfig {
94                required_providers: HashMap::new(),
95            },
96            provider: HashMap::new(),
97            data: HashMap::new(),
98            resource: HashMap::new(),
99            output: HashMap::new(),
100        }
101    }
102}
103
104impl TerraformBatch {
105    pub async fn provision(self, pool: &mut TerraformPool) -> Result<TerraformResult> {
106        // Hack to quiet false-positive `clippy::needless_pass_by_ref_mut` on latest nightlies.
107        // TODO(mingwei): Remove this when it is no longer needed (current date 2023-08-30).
108        // https://github.com/rust-lang/rust-clippy/issues/11380
109        let pool = std::convert::identity(pool);
110
111        if self.terraform.required_providers.is_empty()
112            && self.resource.is_empty()
113            && self.data.is_empty()
114            && self.output.is_empty()
115        {
116            return Ok(TerraformResult {
117                outputs: HashMap::new(),
118                deployment_folder: None,
119            });
120        }
121
122        ProgressTracker::with_group("terraform", Some(1), || async {
123            let dothydro_folder = std::env::current_dir().unwrap().join(".hydro");
124            std::fs::create_dir_all(&dothydro_folder).unwrap();
125            let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();
126
127            std::fs::write(
128                deployment_folder.path().join("main.tf.json"),
129                serde_json::to_string(&self).unwrap(),
130            )
131            .unwrap();
132
133            if !Command::new("terraform")
134                .current_dir(deployment_folder.path())
135                .arg("init")
136                .stdout(Stdio::null())
137                .spawn()
138                .context("Failed to spawn `terraform`. Is it installed?")?
139                .wait()
140                .context("Failed to launch terraform init command")?
141                .success()
142            {
143                bail!("Failed to initialize terraform");
144            }
145
146            let (apply_id, apply) = pool.create_apply(deployment_folder)?;
147
148            let output = ProgressTracker::with_group(
149                "apply",
150                Some(self.resource.values().map(|r| r.len()).sum()),
151                || async { apply.write().await.output().await },
152            )
153            .await;
154            pool.drop_apply(apply_id);
155            output
156        })
157        .await
158    }
159}
160
161struct TerraformApply {
162    child: Option<(u32, Arc<RwLock<Child>>)>,
163    deployment_folder: Option<TempDir>,
164}
165
166async fn display_apply_outputs(stdout: &mut ChildStdout) {
167    let lines = BufReader::new(stdout).lines();
168    let mut waiting_for_result = HashMap::new();
169
170    for line in lines {
171        if let Ok(line) = line {
172            let mut split = line.split(':');
173            if let Some(first) = split.next()
174                && first.chars().all(|c| c != ' ')
175                && split.next().is_some()
176                && split.next().is_none()
177            {
178                if line.starts_with("Plan:")
179                    || line.starts_with("Outputs:")
180                    || line.contains(": Still creating...")
181                    || line.contains(": Reading...")
182                    || line.contains(": Still reading...")
183                    || line.contains(": Read complete after")
184                {
185                } else if line.ends_with(": Creating...") {
186                    let id = line.split(':').next().unwrap().trim().to_string();
187                    let (channel_send, channel_recv) = tokio::sync::oneshot::channel();
188                    waiting_for_result.insert(
189                        id.to_string(),
190                        (
191                            channel_send,
192                            tokio::task::spawn(ProgressTracker::leaf(id, async move {
193                                // `Err(RecvError)` means send side was dropped due to another error.
194                                // Ignore here to prevent spurious panic stack traces.
195                                let _result = channel_recv.await;
196                            })),
197                        ),
198                    );
199                } else if line.contains(": Creation complete after") {
200                    let id = line.split(':').next().unwrap().trim();
201                    let (sender, to_await) = waiting_for_result.remove(id).unwrap();
202                    let _ = sender.send(());
203                    to_await.await.unwrap();
204                } else {
205                    panic!("Unexpected from Terraform: {}", line);
206                }
207            }
208        } else {
209            break;
210        }
211    }
212}
213
214fn filter_terraform_logs(child: &mut Child) {
215    let lines = BufReader::new(child.stdout.take().unwrap()).lines();
216    for line in lines {
217        if let Ok(line) = line {
218            let mut split = line.split(':');
219            if let Some(first) = split.next()
220                && first.chars().all(|c| c != ' ')
221                && split.next().is_some()
222                && split.next().is_none()
223            {
224                eprintln!("[terraform] {}", line);
225            }
226        } else {
227            break;
228        }
229    }
230}
231
232impl TerraformApply {
233    async fn output(&mut self) -> Result<TerraformResult> {
234        let (_, child) = self.child.as_ref().unwrap().clone();
235        let mut stdout = child.write().unwrap().stdout.take().unwrap();
236        let stderr = child.write().unwrap().stderr.take().unwrap();
237
238        let status = tokio::task::spawn_blocking(move || {
239            // it is okay for this thread to keep running even if the future is cancelled
240            child.write().unwrap().wait().unwrap()
241        });
242
243        let display_apply = display_apply_outputs(&mut stdout);
244        let stderr_loop = tokio::task::spawn_blocking(move || {
245            let mut lines = BufReader::new(stderr).lines();
246            while let Some(Ok(line)) = lines.next() {
247                ProgressTracker::println(format!("[terraform] {}", line));
248            }
249        });
250
251        let _ = futures::join!(display_apply, stderr_loop);
252
253        let status = status.await;
254
255        self.child = None;
256
257        if !status.unwrap().success() {
258            bail!("Terraform deployment failed, see `[terraform]` logs above.");
259        }
260
261        let mut output_command = Command::new("terraform");
262        output_command
263            .current_dir(self.deployment_folder.as_ref().unwrap().path())
264            .arg("output")
265            .arg("-json");
266
267        #[cfg(unix)]
268        {
269            output_command.process_group(0);
270        }
271
272        let output = output_command
273            .output()
274            .context("Failed to read Terraform outputs")?;
275
276        Ok(TerraformResult {
277            outputs: serde_json::from_slice(&output.stdout).unwrap(),
278            deployment_folder: self.deployment_folder.take(),
279        })
280    }
281}
282
283fn destroy_deployment(deployment_folder: TempDir) {
284    println!(
285        "Destroying terraform deployment at {}",
286        deployment_folder.path().display()
287    );
288
289    let mut destroy_command = Command::new("terraform");
290    destroy_command
291        .current_dir(deployment_folder.path())
292        .arg("destroy")
293        .arg("-auto-approve")
294        .arg("-no-color")
295        .arg("-parallelism=128")
296        .stdout(Stdio::piped());
297
298    #[cfg(unix)]
299    {
300        destroy_command.process_group(0);
301    }
302
303    let mut destroy_child = destroy_command
304        .spawn()
305        .expect("Failed to spawn terraform destroy command");
306
307    filter_terraform_logs(&mut destroy_child);
308
309    if !destroy_child
310        .wait()
311        .expect("Failed to destroy terraform deployment")
312        .success()
313    {
314        // prevent the folder from being deleted
315        let _ = deployment_folder.keep();
316        eprintln!("WARNING: failed to destroy terraform deployment");
317    }
318}
319
320impl Drop for TerraformApply {
321    fn drop(&mut self) {
322        if let Some((pid, child)) = self.child.take() {
323            #[cfg(unix)]
324            nix::sys::signal::kill(
325                nix::unistd::Pid::from_raw(pid as i32),
326                nix::sys::signal::Signal::SIGINT,
327            )
328            .unwrap();
329            #[cfg(not(unix))]
330            let _ = pid;
331
332            let mut child_write = child.write().unwrap();
333            if child_write.try_wait().unwrap().is_none() {
334                println!("Waiting for Terraform apply to finish...");
335                child_write.wait().unwrap();
336            }
337        }
338
339        if let Some(deployment_folder) = self.deployment_folder.take() {
340            destroy_deployment(deployment_folder);
341        }
342    }
343}
344
345#[derive(Serialize, Deserialize)]
346pub struct TerraformConfig {
347    pub required_providers: HashMap<String, TerraformProvider>,
348}
349
350#[derive(Serialize, Deserialize)]
351pub struct TerraformProvider {
352    pub source: String,
353    pub version: String,
354}
355
356#[derive(Serialize, Deserialize, Debug)]
357pub struct TerraformOutput {
358    pub value: String,
359}
360
361#[derive(Debug)]
362pub struct TerraformResult {
363    pub outputs: HashMap<String, TerraformOutput>,
364    /// `None` if no deployment was performed
365    pub deployment_folder: Option<TempDir>,
366}
367
368impl Drop for TerraformResult {
369    fn drop(&mut self) {
370        if let Some(deployment_folder) = self.deployment_folder.take() {
371            destroy_deployment(deployment_folder);
372        }
373    }
374}
375
376#[derive(Serialize, Deserialize)]
377pub struct TerraformResultOutput {
378    value: String,
379}