hydro_deploy/
gcp.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::sync::{Arc, Mutex, OnceLock};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use nanoid::nanoid;
9use serde_json::json;
10use tokio::sync::RwLock;
11
12use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
13use super::{ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult};
14use crate::ssh::LaunchedSshHost;
15use crate::{BaseServerStrategy, HostStrategyGetter, PortNetworkHint};
16
17pub struct LaunchedComputeEngine {
18    resource_result: Arc<ResourceResult>,
19    user: String,
20    pub internal_ip: String,
21    pub external_ip: Option<String>,
22}
23
24impl LaunchedSshHost for LaunchedComputeEngine {
25    fn get_external_ip(&self) -> Option<String> {
26        self.external_ip.clone()
27    }
28
29    fn get_internal_ip(&self) -> String {
30        self.internal_ip.clone()
31    }
32
33    fn get_cloud_provider(&self) -> String {
34        "GCP".to_string()
35    }
36
37    fn resource_result(&self) -> &Arc<ResourceResult> {
38        &self.resource_result
39    }
40
41    fn ssh_user(&self) -> &str {
42        self.user.as_str()
43    }
44}
45
46#[derive(Debug)]
47pub struct GcpNetwork {
48    pub project: String,
49    pub existing_vpc: Option<String>,
50    id: String,
51}
52
53impl GcpNetwork {
54    pub fn new(project: impl Into<String>, existing_vpc: Option<String>) -> Self {
55        Self {
56            project: project.into(),
57            existing_vpc,
58            id: nanoid!(8, &TERRAFORM_ALPHABET),
59        }
60    }
61
62    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
63        resource_batch
64            .terraform
65            .terraform
66            .required_providers
67            .insert(
68                "google".to_string(),
69                TerraformProvider {
70                    source: "hashicorp/google".to_string(),
71                    version: "4.53.1".to_string(),
72                },
73            );
74
75        let vpc_network = format!("hydro-vpc-network-{}", self.id);
76
77        if let Some(existing) = self.existing_vpc.as_ref() {
78            if resource_batch
79                .terraform
80                .resource
81                .get("google_compute_network")
82                .unwrap_or(&HashMap::new())
83                .contains_key(existing)
84            {
85                format!("google_compute_network.{existing}")
86            } else {
87                resource_batch
88                    .terraform
89                    .data
90                    .entry("google_compute_network".to_string())
91                    .or_default()
92                    .insert(
93                        vpc_network.clone(),
94                        json!({
95                            "name": existing,
96                            "project": self.project,
97                        }),
98                    );
99
100                format!("data.google_compute_network.{vpc_network}")
101            }
102        } else {
103            resource_batch
104                .terraform
105                .resource
106                .entry("google_compute_network".to_string())
107                .or_default()
108                .insert(
109                    vpc_network.clone(),
110                    json!({
111                        "name": vpc_network,
112                        "project": self.project,
113                        "auto_create_subnetworks": true
114                    }),
115                );
116
117            let firewall_entries = resource_batch
118                .terraform
119                .resource
120                .entry("google_compute_firewall".to_string())
121                .or_default();
122
123            // allow all VMs to communicate with each other over internal IPs
124            firewall_entries.insert(
125                format!("{vpc_network}-default-allow-internal"),
126                json!({
127                    "name": format!("{vpc_network}-default-allow-internal"),
128                    "project": self.project,
129                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
130                    "source_ranges": ["10.128.0.0/9"],
131                    "allow": [
132                        {
133                            "protocol": "tcp",
134                            "ports": ["0-65535"]
135                        },
136                        {
137                            "protocol": "udp",
138                            "ports": ["0-65535"]
139                        },
140                        {
141                            "protocol": "icmp"
142                        }
143                    ]
144                }),
145            );
146
147            // allow external pings to all VMs
148            firewall_entries.insert(
149                format!("{vpc_network}-default-allow-ping"),
150                json!({
151                    "name": format!("{vpc_network}-default-allow-ping"),
152                    "project": self.project,
153                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
154                    "source_ranges": ["0.0.0.0/0"],
155                    "allow": [
156                        {
157                            "protocol": "icmp"
158                        }
159                    ]
160                }),
161            );
162
163            self.existing_vpc = Some(vpc_network.clone());
164
165            format!("google_compute_network.{vpc_network}")
166        }
167    }
168}
169
170pub struct GcpComputeEngineHost {
171    /// ID from [`crate::Deployment::add_host`].
172    id: usize,
173
174    project: String,
175    machine_type: String,
176    image: String,
177    region: String,
178    network: Arc<RwLock<GcpNetwork>>,
179    user: Option<String>,
180    display_name: Option<String>,
181    pub launched: OnceLock<Arc<LaunchedComputeEngine>>, // TODO(mingwei): fix pub
182    external_ports: Mutex<Vec<u16>>,
183}
184
185impl Debug for GcpComputeEngineHost {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.write_fmt(format_args!(
188            "GcpComputeEngineHost({} ({:?}))",
189            self.id, &self.display_name
190        ))
191    }
192}
193
194impl GcpComputeEngineHost {
195    #[expect(clippy::too_many_arguments, reason = "Mainly for internal use.")]
196    pub fn new(
197        id: usize,
198        project: impl Into<String>,
199        machine_type: impl Into<String>,
200        image: impl Into<String>,
201        region: impl Into<String>,
202        network: Arc<RwLock<GcpNetwork>>,
203        user: Option<String>,
204        display_name: Option<String>,
205    ) -> Self {
206        Self {
207            id,
208            project: project.into(),
209            machine_type: machine_type.into(),
210            image: image.into(),
211            region: region.into(),
212            network,
213            user,
214            display_name,
215            launched: OnceLock::new(),
216            external_ports: Mutex::new(Vec::new()),
217        }
218    }
219}
220
221#[async_trait]
222impl Host for GcpComputeEngineHost {
223    fn target_type(&self) -> HostTargetType {
224        HostTargetType::Linux
225    }
226
227    fn request_port_base(&self, bind_type: &BaseServerStrategy) {
228        match bind_type {
229            BaseServerStrategy::UnixSocket => {}
230            BaseServerStrategy::InternalTcpPort(_) => {}
231            BaseServerStrategy::ExternalTcpPort(port) => {
232                let mut external_ports = self.external_ports.lock().unwrap();
233                if !external_ports.contains(port) {
234                    if self.launched.get().is_some() {
235                        todo!("Cannot adjust firewall after host has been launched");
236                    }
237                    external_ports.push(*port);
238                }
239            }
240        }
241    }
242
243    fn request_custom_binary(&self) {
244        self.request_port_base(&BaseServerStrategy::ExternalTcpPort(22));
245    }
246
247    fn id(&self) -> usize {
248        self.id
249    }
250
251    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
252        if self.launched.get().is_some() {
253            return;
254        }
255
256        let vpc_path = self
257            .network
258            .try_write()
259            .unwrap()
260            .collect_resources(resource_batch);
261
262        let project = self.project.as_str();
263
264        // first, we import the providers we need
265        resource_batch
266            .terraform
267            .terraform
268            .required_providers
269            .insert(
270                "google".to_string(),
271                TerraformProvider {
272                    source: "hashicorp/google".to_string(),
273                    version: "4.53.1".to_string(),
274                },
275            );
276
277        resource_batch
278            .terraform
279            .terraform
280            .required_providers
281            .insert(
282                "local".to_string(),
283                TerraformProvider {
284                    source: "hashicorp/local".to_string(),
285                    version: "2.3.0".to_string(),
286                },
287            );
288
289        resource_batch
290            .terraform
291            .terraform
292            .required_providers
293            .insert(
294                "tls".to_string(),
295                TerraformProvider {
296                    source: "hashicorp/tls".to_string(),
297                    version: "4.0.4".to_string(),
298                },
299            );
300
301        // we use a single SSH key for all VMs
302        resource_batch
303            .terraform
304            .resource
305            .entry("tls_private_key".to_string())
306            .or_default()
307            .insert(
308                "vm_instance_ssh_key".to_string(),
309                json!({
310                    "algorithm": "RSA",
311                    "rsa_bits": 4096
312                }),
313            );
314
315        resource_batch
316            .terraform
317            .resource
318            .entry("local_file".to_string())
319            .or_default()
320            .insert(
321                "vm_instance_ssh_key_pem".to_string(),
322                json!({
323                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
324                    "filename": ".ssh/vm_instance_ssh_key_pem",
325                    "file_permission": "0600"
326                }),
327            );
328
329        let vm_key = format!("vm-instance-{}", self.id);
330        let mut vm_name = format!("hydro-vm-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET),);
331        // Name must match regex: (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?), max length = 63 (61 + 1 a-z before and after)
332        if let Some(mut display_name) = self.display_name.clone() {
333            vm_name.push('-');
334            display_name = display_name
335                .replace("_", "-")
336                .replace(":", "-")
337                .to_lowercase();
338
339            // Keep the latter half of display_name if it is too long
340            let num_chars_to_cut = vm_name.len() + display_name.len() - 63;
341            if num_chars_to_cut > 0 {
342                display_name.drain(0..num_chars_to_cut);
343            }
344            vm_name.push_str(&display_name);
345        }
346
347        let mut tags = vec![];
348        let mut external_interfaces = vec![];
349
350        let external_ports = self.external_ports.lock().unwrap();
351        if external_ports.is_empty() {
352            external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") }));
353        } else {
354            external_interfaces.push(json!({
355                "network": format!("${{{vpc_path}.self_link}}"),
356                "access_config": [
357                    {
358                        "network_tier": "STANDARD"
359                    }
360                ]
361            }));
362
363            // open the external ports that were requested
364            let my_external_tags = external_ports.iter().map(|port| {
365                let rule_id = nanoid!(8, &TERRAFORM_ALPHABET);
366                let firewall_rule = resource_batch
367                    .terraform
368                    .resource
369                    .entry("google_compute_firewall".to_string())
370                    .or_default()
371                    .entry(format!("open-external-port-{}", port))
372                    .or_insert(json!({
373                        "name": format!("open-external-port-{}-{}", port, rule_id),
374                        "project": project,
375                        "network": format!("${{{vpc_path}.name}}"),
376                        "target_tags": [format!("open-external-port-tag-{}-{}", port, rule_id)],
377                        "source_ranges": ["0.0.0.0/0"],
378                        "allow": [
379                            {
380                                "protocol": "tcp",
381                                "ports": vec![port.to_string()]
382                            }
383                        ]
384                    }));
385
386                firewall_rule["target_tags"].as_array().unwrap()[0].clone()
387            });
388
389            tags.extend(my_external_tags);
390
391            resource_batch.terraform.output.insert(
392                format!("{vm_key}-public-ip"),
393                TerraformOutput {
394                    value: format!("${{google_compute_instance.{vm_key}.network_interface[0].access_config[0].nat_ip}}")
395                }
396            );
397        }
398        drop(external_ports); // Drop the lock as soon as possible.
399
400        let user = self.user.as_deref().unwrap_or("hydro");
401        resource_batch
402            .terraform
403            .resource
404            .entry("google_compute_instance".to_string())
405            .or_default()
406            .insert(
407                vm_key.clone(),
408                json!({
409                    "name": vm_name,
410                    "project": project,
411                    "machine_type": self.machine_type,
412                    "zone": self.region,
413                    "tags": tags,
414                    "metadata": {
415                        "ssh-keys": format!("{user}:${{tls_private_key.vm_instance_ssh_key.public_key_openssh}}")
416                    },
417                    "boot_disk": [
418                        {
419                            "initialize_params": [
420                                {
421                                    "image": self.image
422                                }
423                            ]
424                        }
425                    ],
426                    "network_interface": external_interfaces,
427                }),
428            );
429
430        resource_batch.terraform.output.insert(
431            format!("{vm_key}-internal-ip"),
432            TerraformOutput {
433                value: format!(
434                    "${{google_compute_instance.{vm_key}.network_interface[0].network_ip}}"
435                ),
436            },
437        );
438    }
439
440    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
441        self.launched
442            .get()
443            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
444    }
445
446    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
447        self.launched
448            .get_or_init(|| {
449                let id = self.id;
450
451                let internal_ip = resource_result
452                    .terraform
453                    .outputs
454                    .get(&format!("vm-instance-{id}-internal-ip"))
455                    .unwrap()
456                    .value
457                    .clone();
458
459                let external_ip = resource_result
460                    .terraform
461                    .outputs
462                    .get(&format!("vm-instance-{id}-public-ip"))
463                    .map(|v| v.value.clone());
464
465                Arc::new(LaunchedComputeEngine {
466                    resource_result: resource_result.clone(),
467                    user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
468                    internal_ip,
469                    external_ip,
470                })
471            })
472            .clone()
473    }
474
475    fn strategy_as_server<'a>(
476        &'a self,
477        client_host: &dyn Host,
478        network_hint: PortNetworkHint,
479    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
480        if matches!(network_hint, PortNetworkHint::Auto)
481            && client_host.can_connect_to(ClientStrategy::UnixSocket(self.id))
482        {
483            Ok((
484                ClientStrategy::UnixSocket(self.id),
485                Box::new(|_| BaseServerStrategy::UnixSocket),
486            ))
487        } else if matches!(
488            network_hint,
489            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
490        ) && client_host.can_connect_to(ClientStrategy::InternalTcpPort(self))
491        {
492            Ok((
493                ClientStrategy::InternalTcpPort(self),
494                Box::new(move |_| {
495                    BaseServerStrategy::InternalTcpPort(match network_hint {
496                        PortNetworkHint::Auto => None,
497                        PortNetworkHint::TcpPort(port) => port,
498                    })
499                }),
500            ))
501        } else if matches!(network_hint, PortNetworkHint::Auto)
502            && client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self))
503        {
504            Ok((
505                ClientStrategy::ForwardedTcpPort(self),
506                Box::new(|me| {
507                    me.downcast_ref::<GcpComputeEngineHost>()
508                        .unwrap()
509                        .request_port_base(&BaseServerStrategy::ExternalTcpPort(22)); // needed to forward
510                    BaseServerStrategy::InternalTcpPort(None)
511                }),
512            ))
513        } else {
514            anyhow::bail!("Could not find a strategy to connect to GCP instance")
515        }
516    }
517
518    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
519        match typ {
520            ClientStrategy::UnixSocket(id) => {
521                #[cfg(unix)]
522                {
523                    self.id == id
524                }
525
526                #[cfg(not(unix))]
527                {
528                    let _ = id;
529                    false
530                }
531            }
532            ClientStrategy::InternalTcpPort(target_host) => {
533                if let Some(gcp_target) =
534                    <dyn Any>::downcast_ref::<GcpComputeEngineHost>(target_host)
535                {
536                    self.project == gcp_target.project
537                } else {
538                    false
539                }
540            }
541            ClientStrategy::ForwardedTcpPort(_) => false,
542        }
543    }
544}