hydro_deploy/
gcp.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use anyhow::Result;
5use async_trait::async_trait;
6use nanoid::nanoid;
7use serde_json::json;
8use tokio::sync::RwLock;
9
10use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
11use super::{
12    ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult,
13    ServerStrategy,
14};
15use crate::HostStrategyGetter;
16use crate::ssh::LaunchedSshHost;
17
18pub struct LaunchedComputeEngine {
19    resource_result: Arc<ResourceResult>,
20    user: String,
21    pub internal_ip: String,
22    pub external_ip: Option<String>,
23}
24
25impl LaunchedSshHost for LaunchedComputeEngine {
26    fn get_external_ip(&self) -> Option<String> {
27        self.external_ip.clone()
28    }
29
30    fn get_internal_ip(&self) -> String {
31        self.internal_ip.clone()
32    }
33
34    fn get_cloud_provider(&self) -> String {
35        "GCP".to_string()
36    }
37
38    fn resource_result(&self) -> &Arc<ResourceResult> {
39        &self.resource_result
40    }
41
42    fn ssh_user(&self) -> &str {
43        self.user.as_str()
44    }
45}
46
47#[derive(Debug)]
48pub struct GcpNetwork {
49    pub project: String,
50    pub existing_vpc: Option<String>,
51    id: String,
52}
53
54impl GcpNetwork {
55    pub fn new(project: impl Into<String>, existing_vpc: Option<String>) -> Self {
56        Self {
57            project: project.into(),
58            existing_vpc,
59            id: nanoid!(8, &TERRAFORM_ALPHABET),
60        }
61    }
62
63    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
64        resource_batch
65            .terraform
66            .terraform
67            .required_providers
68            .insert(
69                "google".to_string(),
70                TerraformProvider {
71                    source: "hashicorp/google".to_string(),
72                    version: "4.53.1".to_string(),
73                },
74            );
75
76        let vpc_network = format!("hydro-vpc-network-{}", self.id);
77
78        if let Some(existing) = self.existing_vpc.as_ref() {
79            if resource_batch
80                .terraform
81                .resource
82                .get("google_compute_network")
83                .unwrap_or(&HashMap::new())
84                .contains_key(existing)
85            {
86                format!("google_compute_network.{existing}")
87            } else {
88                resource_batch
89                    .terraform
90                    .data
91                    .entry("google_compute_network".to_string())
92                    .or_default()
93                    .insert(
94                        vpc_network.clone(),
95                        json!({
96                            "name": existing,
97                            "project": self.project,
98                        }),
99                    );
100
101                format!("data.google_compute_network.{vpc_network}")
102            }
103        } else {
104            resource_batch
105                .terraform
106                .resource
107                .entry("google_compute_network".to_string())
108                .or_default()
109                .insert(
110                    vpc_network.clone(),
111                    json!({
112                        "name": vpc_network,
113                        "project": self.project,
114                        "auto_create_subnetworks": true
115                    }),
116                );
117
118            let firewall_entries = resource_batch
119                .terraform
120                .resource
121                .entry("google_compute_firewall".to_string())
122                .or_default();
123
124            // allow all VMs to communicate with each other over internal IPs
125            firewall_entries.insert(
126                format!("{vpc_network}-default-allow-internal"),
127                json!({
128                    "name": format!("{vpc_network}-default-allow-internal"),
129                    "project": self.project,
130                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
131                    "source_ranges": ["10.128.0.0/9"],
132                    "allow": [
133                        {
134                            "protocol": "tcp",
135                            "ports": ["0-65535"]
136                        },
137                        {
138                            "protocol": "udp",
139                            "ports": ["0-65535"]
140                        },
141                        {
142                            "protocol": "icmp"
143                        }
144                    ]
145                }),
146            );
147
148            // allow external pings to all VMs
149            firewall_entries.insert(
150                format!("{vpc_network}-default-allow-ping"),
151                json!({
152                    "name": format!("{vpc_network}-default-allow-ping"),
153                    "project": self.project,
154                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
155                    "source_ranges": ["0.0.0.0/0"],
156                    "allow": [
157                        {
158                            "protocol": "icmp"
159                        }
160                    ]
161                }),
162            );
163
164            self.existing_vpc = Some(vpc_network.clone());
165
166            format!("google_compute_network.{vpc_network}")
167        }
168    }
169}
170
171pub struct GcpComputeEngineHost {
172    /// ID from [`crate::Deployment::add_host`].
173    id: usize,
174
175    project: String,
176    machine_type: String,
177    image: String,
178    region: String,
179    network: Arc<RwLock<GcpNetwork>>,
180    user: Option<String>,
181    pub launched: OnceLock<Arc<LaunchedComputeEngine>>, // TODO(mingwei): fix pub
182    external_ports: Mutex<Vec<u16>>,
183}
184
185impl GcpComputeEngineHost {
186    pub fn new(
187        id: usize,
188        project: impl Into<String>,
189        machine_type: impl Into<String>,
190        image: impl Into<String>,
191        region: impl Into<String>,
192        network: Arc<RwLock<GcpNetwork>>,
193        user: Option<String>,
194    ) -> Self {
195        Self {
196            id,
197            project: project.into(),
198            machine_type: machine_type.into(),
199            image: image.into(),
200            region: region.into(),
201            network,
202            user,
203            launched: OnceLock::new(),
204            external_ports: Mutex::new(Vec::new()),
205        }
206    }
207}
208
209#[async_trait]
210impl Host for GcpComputeEngineHost {
211    fn target_type(&self) -> HostTargetType {
212        HostTargetType::Linux
213    }
214
215    fn request_port(&self, bind_type: &ServerStrategy) {
216        match bind_type {
217            ServerStrategy::UnixSocket => {}
218            ServerStrategy::InternalTcpPort => {}
219            ServerStrategy::ExternalTcpPort(port) => {
220                let mut external_ports = self.external_ports.lock().unwrap();
221                if !external_ports.contains(port) {
222                    if self.launched.get().is_some() {
223                        todo!("Cannot adjust firewall after host has been launched");
224                    }
225                    external_ports.push(*port);
226                }
227            }
228            ServerStrategy::Demux(demux) => {
229                for bind_type in demux.values() {
230                    self.request_port(bind_type);
231                }
232            }
233            ServerStrategy::Merge(merge) => {
234                for bind_type in merge {
235                    self.request_port(bind_type);
236                }
237            }
238            ServerStrategy::Tagged(underlying, _) => {
239                self.request_port(underlying);
240            }
241            ServerStrategy::Null => {}
242        }
243    }
244
245    fn request_custom_binary(&self) {
246        self.request_port(&ServerStrategy::ExternalTcpPort(22));
247    }
248
249    fn id(&self) -> usize {
250        self.id
251    }
252
253    fn as_any(&self) -> &dyn std::any::Any {
254        self
255    }
256
257    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
258        if self.launched.get().is_some() {
259            return;
260        }
261
262        let vpc_path = self
263            .network
264            .try_write()
265            .unwrap()
266            .collect_resources(resource_batch);
267
268        let project = self.project.as_str();
269
270        // first, we import the providers we need
271        resource_batch
272            .terraform
273            .terraform
274            .required_providers
275            .insert(
276                "google".to_string(),
277                TerraformProvider {
278                    source: "hashicorp/google".to_string(),
279                    version: "4.53.1".to_string(),
280                },
281            );
282
283        resource_batch
284            .terraform
285            .terraform
286            .required_providers
287            .insert(
288                "local".to_string(),
289                TerraformProvider {
290                    source: "hashicorp/local".to_string(),
291                    version: "2.3.0".to_string(),
292                },
293            );
294
295        resource_batch
296            .terraform
297            .terraform
298            .required_providers
299            .insert(
300                "tls".to_string(),
301                TerraformProvider {
302                    source: "hashicorp/tls".to_string(),
303                    version: "4.0.4".to_string(),
304                },
305            );
306
307        // we use a single SSH key for all VMs
308        resource_batch
309            .terraform
310            .resource
311            .entry("tls_private_key".to_string())
312            .or_default()
313            .insert(
314                "vm_instance_ssh_key".to_string(),
315                json!({
316                    "algorithm": "RSA",
317                    "rsa_bits": 4096
318                }),
319            );
320
321        resource_batch
322            .terraform
323            .resource
324            .entry("local_file".to_string())
325            .or_default()
326            .insert(
327                "vm_instance_ssh_key_pem".to_string(),
328                json!({
329                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
330                    "filename": ".ssh/vm_instance_ssh_key_pem",
331                    "file_permission": "0600"
332                }),
333            );
334
335        let vm_key = format!("vm-instance-{}", self.id);
336        let vm_name = format!("hydro-vm-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET));
337
338        let mut tags = vec![];
339        let mut external_interfaces = vec![];
340
341        let external_ports = self.external_ports.lock().unwrap();
342        if external_ports.is_empty() {
343            external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") }));
344        } else {
345            external_interfaces.push(json!({
346                "network": format!("${{{vpc_path}.self_link}}"),
347                "access_config": [
348                    {
349                        "network_tier": "STANDARD"
350                    }
351                ]
352            }));
353
354            // open the external ports that were requested
355            let my_external_tags = external_ports.iter().map(|port| {
356                let rule_id = nanoid!(8, &TERRAFORM_ALPHABET);
357                let firewall_rule = resource_batch
358                    .terraform
359                    .resource
360                    .entry("google_compute_firewall".to_string())
361                    .or_default()
362                    .entry(format!("open-external-port-{}", port))
363                    .or_insert(json!({
364                        "name": format!("open-external-port-{}-{}", port, rule_id),
365                        "project": project,
366                        "network": format!("${{{vpc_path}.name}}"),
367                        "target_tags": [format!("open-external-port-tag-{}-{}", port, rule_id)],
368                        "source_ranges": ["0.0.0.0/0"],
369                        "allow": [
370                            {
371                                "protocol": "tcp",
372                                "ports": vec![port.to_string()]
373                            }
374                        ]
375                    }));
376
377                firewall_rule["target_tags"].as_array().unwrap()[0].clone()
378            });
379
380            tags.extend(my_external_tags);
381
382            resource_batch.terraform.output.insert(
383                format!("{vm_key}-public-ip"),
384                TerraformOutput {
385                    value: format!("${{google_compute_instance.{vm_key}.network_interface[0].access_config[0].nat_ip}}")
386                }
387            );
388        }
389        drop(external_ports); // Drop the lock as soon as possible.
390
391        let user = self.user.as_ref().cloned().unwrap_or("hydro".to_string());
392        resource_batch
393            .terraform
394            .resource
395            .entry("google_compute_instance".to_string())
396            .or_default()
397            .insert(
398                vm_key.clone(),
399                json!({
400                    "name": vm_name,
401                    "project": project,
402                    "machine_type": self.machine_type,
403                    "zone": self.region,
404                    "tags": tags,
405                    "metadata": {
406                        "ssh-keys": format!("{user}:${{tls_private_key.vm_instance_ssh_key.public_key_openssh}}")
407                    },
408                    "boot_disk": [
409                        {
410                            "initialize_params": [
411                                {
412                                    "image": self.image
413                                }
414                            ]
415                        }
416                    ],
417                    "network_interface": external_interfaces,
418                }),
419            );
420
421        resource_batch.terraform.output.insert(
422            format!("{vm_key}-internal-ip"),
423            TerraformOutput {
424                value: format!(
425                    "${{google_compute_instance.{vm_key}.network_interface[0].network_ip}}"
426                ),
427            },
428        );
429    }
430
431    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
432        self.launched
433            .get()
434            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
435    }
436
437    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
438        self.launched
439            .get_or_init(|| {
440                let id = self.id;
441
442                let internal_ip = resource_result
443                    .terraform
444                    .outputs
445                    .get(&format!("vm-instance-{id}-internal-ip"))
446                    .unwrap()
447                    .value
448                    .clone();
449
450                let external_ip = resource_result
451                    .terraform
452                    .outputs
453                    .get(&format!("vm-instance-{id}-public-ip"))
454                    .map(|v| v.value.clone());
455
456                Arc::new(LaunchedComputeEngine {
457                    resource_result: resource_result.clone(),
458                    user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
459                    internal_ip,
460                    external_ip,
461                })
462            })
463            .clone()
464    }
465
466    fn strategy_as_server<'a>(
467        &'a self,
468        client_host: &dyn Host,
469    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
470        if client_host.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
471            Ok((
472                ClientStrategy::UnixSocket(self.id),
473                Box::new(|_| ServerStrategy::UnixSocket),
474            ))
475        } else if client_host.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
476            Ok((
477                ClientStrategy::InternalTcpPort(self),
478                Box::new(|_| ServerStrategy::InternalTcpPort),
479            ))
480        } else if client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self)) {
481            Ok((
482                ClientStrategy::ForwardedTcpPort(self),
483                Box::new(|me| {
484                    me.downcast_ref::<GcpComputeEngineHost>()
485                        .unwrap()
486                        .request_port(&ServerStrategy::ExternalTcpPort(22)); // needed to forward
487                    ServerStrategy::InternalTcpPort
488                }),
489            ))
490        } else {
491            anyhow::bail!("Could not find a strategy to connect to GCP instance")
492        }
493    }
494
495    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
496        match typ {
497            ClientStrategy::UnixSocket(id) => {
498                #[cfg(unix)]
499                {
500                    self.id == id
501                }
502
503                #[cfg(not(unix))]
504                {
505                    let _ = id;
506                    false
507                }
508            }
509            ClientStrategy::InternalTcpPort(target_host) => {
510                if let Some(gcp_target) =
511                    target_host.as_any().downcast_ref::<GcpComputeEngineHost>()
512                {
513                    self.project == gcp_target.project
514                } else {
515                    false
516                }
517            }
518            ClientStrategy::ForwardedTcpPort(_) => false,
519        }
520    }
521}