hydro_deploy/
gcp.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::sync::{Arc, Mutex, OnceLock};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use nanoid::nanoid;
9use serde_json::json;
10use tokio::sync::RwLock;
11
12use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
13use super::{ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult};
14use crate::ssh::LaunchedSshHost;
15use crate::{BaseServerStrategy, HostStrategyGetter, PortNetworkHint};
16
17pub struct LaunchedComputeEngine {
18    resource_result: Arc<ResourceResult>,
19    user: String,
20    pub internal_ip: String,
21    pub external_ip: Option<String>,
22}
23
24impl LaunchedSshHost for LaunchedComputeEngine {
25    fn get_external_ip(&self) -> Option<String> {
26        self.external_ip.clone()
27    }
28
29    fn get_internal_ip(&self) -> String {
30        self.internal_ip.clone()
31    }
32
33    fn get_cloud_provider(&self) -> String {
34        "GCP".to_string()
35    }
36
37    fn resource_result(&self) -> &Arc<ResourceResult> {
38        &self.resource_result
39    }
40
41    fn ssh_user(&self) -> &str {
42        self.user.as_str()
43    }
44}
45
46#[derive(Debug)]
47pub struct GcpNetwork {
48    pub project: String,
49    pub existing_vpc: Option<String>,
50    id: String,
51}
52
53impl GcpNetwork {
54    pub fn new(project: impl Into<String>, existing_vpc: Option<String>) -> Self {
55        Self {
56            project: project.into(),
57            existing_vpc,
58            id: nanoid!(8, &TERRAFORM_ALPHABET),
59        }
60    }
61
62    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
63        resource_batch
64            .terraform
65            .terraform
66            .required_providers
67            .insert(
68                "google".to_string(),
69                TerraformProvider {
70                    source: "hashicorp/google".to_string(),
71                    version: "4.53.1".to_string(),
72                },
73            );
74
75        let vpc_network = format!("hydro-vpc-network-{}", self.id);
76
77        if let Some(existing) = self.existing_vpc.as_ref() {
78            if resource_batch
79                .terraform
80                .resource
81                .get("google_compute_network")
82                .unwrap_or(&HashMap::new())
83                .contains_key(existing)
84            {
85                format!("google_compute_network.{existing}")
86            } else {
87                resource_batch
88                    .terraform
89                    .data
90                    .entry("google_compute_network".to_string())
91                    .or_default()
92                    .insert(
93                        vpc_network.clone(),
94                        json!({
95                            "name": existing,
96                            "project": self.project,
97                        }),
98                    );
99
100                format!("data.google_compute_network.{vpc_network}")
101            }
102        } else {
103            resource_batch
104                .terraform
105                .resource
106                .entry("google_compute_network".to_string())
107                .or_default()
108                .insert(
109                    vpc_network.clone(),
110                    json!({
111                        "name": vpc_network,
112                        "project": self.project,
113                        "auto_create_subnetworks": true
114                    }),
115                );
116
117            let firewall_entries = resource_batch
118                .terraform
119                .resource
120                .entry("google_compute_firewall".to_string())
121                .or_default();
122
123            // allow all VMs to communicate with each other over internal IPs
124            firewall_entries.insert(
125                format!("{vpc_network}-default-allow-internal"),
126                json!({
127                    "name": format!("{vpc_network}-default-allow-internal"),
128                    "project": self.project,
129                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
130                    "source_ranges": ["10.128.0.0/9"],
131                    "allow": [
132                        {
133                            "protocol": "tcp",
134                            "ports": ["0-65535"]
135                        },
136                        {
137                            "protocol": "udp",
138                            "ports": ["0-65535"]
139                        },
140                        {
141                            "protocol": "icmp"
142                        }
143                    ]
144                }),
145            );
146
147            // allow external pings to all VMs
148            firewall_entries.insert(
149                format!("{vpc_network}-default-allow-ping"),
150                json!({
151                    "name": format!("{vpc_network}-default-allow-ping"),
152                    "project": self.project,
153                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
154                    "source_ranges": ["0.0.0.0/0"],
155                    "allow": [
156                        {
157                            "protocol": "icmp"
158                        }
159                    ]
160                }),
161            );
162
163            self.existing_vpc = Some(vpc_network.clone());
164
165            format!("google_compute_network.{vpc_network}")
166        }
167    }
168}
169
170pub struct GcpComputeEngineHost {
171    /// ID from [`crate::Deployment::add_host`].
172    id: usize,
173
174    project: String,
175    machine_type: String,
176    image: String,
177    target_type: HostTargetType,
178    region: String,
179    network: Arc<RwLock<GcpNetwork>>,
180    user: Option<String>,
181    display_name: Option<String>,
182    pub launched: OnceLock<Arc<LaunchedComputeEngine>>, // TODO(mingwei): fix pub
183    external_ports: Mutex<Vec<u16>>,
184}
185
186impl Debug for GcpComputeEngineHost {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        f.write_fmt(format_args!(
189            "GcpComputeEngineHost({} ({:?}))",
190            self.id, &self.display_name
191        ))
192    }
193}
194
195impl GcpComputeEngineHost {
196    #[expect(clippy::too_many_arguments, reason = "used via builder pattern")]
197    pub fn new(
198        id: usize,
199        project: impl Into<String>,
200        machine_type: impl Into<String>,
201        image: impl Into<String>,
202        target_type: HostTargetType,
203        region: impl Into<String>,
204        network: Arc<RwLock<GcpNetwork>>,
205        user: Option<String>,
206        display_name: Option<String>,
207    ) -> Self {
208        Self {
209            id,
210            project: project.into(),
211            machine_type: machine_type.into(),
212            image: image.into(),
213            target_type,
214            region: region.into(),
215            network,
216            user,
217            display_name,
218            launched: OnceLock::new(),
219            external_ports: Mutex::new(Vec::new()),
220        }
221    }
222}
223
224#[async_trait]
225impl Host for GcpComputeEngineHost {
226    fn target_type(&self) -> HostTargetType {
227        self.target_type
228    }
229
230    fn request_port_base(&self, bind_type: &BaseServerStrategy) {
231        match bind_type {
232            BaseServerStrategy::UnixSocket => {}
233            BaseServerStrategy::InternalTcpPort(_) => {}
234            BaseServerStrategy::ExternalTcpPort(port) => {
235                let mut external_ports = self.external_ports.lock().unwrap();
236                if !external_ports.contains(port) {
237                    if self.launched.get().is_some() {
238                        todo!("Cannot adjust firewall after host has been launched");
239                    }
240                    external_ports.push(*port);
241                }
242            }
243        }
244    }
245
246    fn request_custom_binary(&self) {
247        self.request_port_base(&BaseServerStrategy::ExternalTcpPort(22));
248    }
249
250    fn id(&self) -> usize {
251        self.id
252    }
253
254    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
255        if self.launched.get().is_some() {
256            return;
257        }
258
259        let vpc_path = self
260            .network
261            .try_write()
262            .unwrap()
263            .collect_resources(resource_batch);
264
265        let project = self.project.as_str();
266
267        // first, we import the providers we need
268        resource_batch
269            .terraform
270            .terraform
271            .required_providers
272            .insert(
273                "google".to_string(),
274                TerraformProvider {
275                    source: "hashicorp/google".to_string(),
276                    version: "4.53.1".to_string(),
277                },
278            );
279
280        resource_batch
281            .terraform
282            .terraform
283            .required_providers
284            .insert(
285                "local".to_string(),
286                TerraformProvider {
287                    source: "hashicorp/local".to_string(),
288                    version: "2.3.0".to_string(),
289                },
290            );
291
292        resource_batch
293            .terraform
294            .terraform
295            .required_providers
296            .insert(
297                "tls".to_string(),
298                TerraformProvider {
299                    source: "hashicorp/tls".to_string(),
300                    version: "4.0.4".to_string(),
301                },
302            );
303
304        // we use a single SSH key for all VMs
305        resource_batch
306            .terraform
307            .resource
308            .entry("tls_private_key".to_string())
309            .or_default()
310            .insert(
311                "vm_instance_ssh_key".to_string(),
312                json!({
313                    "algorithm": "RSA",
314                    "rsa_bits": 4096
315                }),
316            );
317
318        resource_batch
319            .terraform
320            .resource
321            .entry("local_file".to_string())
322            .or_default()
323            .insert(
324                "vm_instance_ssh_key_pem".to_string(),
325                json!({
326                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
327                    "filename": ".ssh/vm_instance_ssh_key_pem",
328                    "file_permission": "0600"
329                }),
330            );
331
332        let vm_key = format!("vm-instance-{}", self.id);
333        let mut vm_name = format!("hydro-vm-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET),);
334        // Name must match regex: (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?), max length = 63 (61 + 1 a-z before and after)
335        if let Some(mut display_name) = self.display_name.clone() {
336            vm_name.push('-');
337            display_name = display_name
338                .replace("_", "-")
339                .replace(":", "-")
340                .to_lowercase();
341
342            // Keep the latter half of display_name if it is too long
343            let num_chars_to_cut = vm_name.len() + display_name.len() - 63;
344            if num_chars_to_cut > 0 {
345                display_name.drain(0..num_chars_to_cut);
346            }
347            vm_name.push_str(&display_name);
348        }
349
350        let mut tags = vec![];
351        let mut external_interfaces = vec![];
352
353        let external_ports = self.external_ports.lock().unwrap();
354        if external_ports.is_empty() {
355            external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") }));
356        } else {
357            external_interfaces.push(json!({
358                "network": format!("${{{vpc_path}.self_link}}"),
359                "access_config": [
360                    {
361                        "network_tier": "STANDARD"
362                    }
363                ]
364            }));
365
366            // open the external ports that were requested
367            let my_external_tags = external_ports.iter().map(|port| {
368                let rule_id = nanoid!(8, &TERRAFORM_ALPHABET);
369                let firewall_rule = resource_batch
370                    .terraform
371                    .resource
372                    .entry("google_compute_firewall".to_string())
373                    .or_default()
374                    .entry(format!("open-external-port-{}", port))
375                    .or_insert(json!({
376                        "name": format!("open-external-port-{}-{}", port, rule_id),
377                        "project": project,
378                        "network": format!("${{{vpc_path}.name}}"),
379                        "target_tags": [format!("open-external-port-tag-{}-{}", port, rule_id)],
380                        "source_ranges": ["0.0.0.0/0"],
381                        "allow": [
382                            {
383                                "protocol": "tcp",
384                                "ports": vec![port.to_string()]
385                            }
386                        ]
387                    }));
388
389                firewall_rule["target_tags"].as_array().unwrap()[0].clone()
390            });
391
392            tags.extend(my_external_tags);
393
394            resource_batch.terraform.output.insert(
395                format!("{vm_key}-public-ip"),
396                TerraformOutput {
397                    value: format!("${{google_compute_instance.{vm_key}.network_interface[0].access_config[0].nat_ip}}")
398                }
399            );
400        }
401        drop(external_ports); // Drop the lock as soon as possible.
402
403        let user = self.user.as_deref().unwrap_or("hydro");
404        resource_batch
405            .terraform
406            .resource
407            .entry("google_compute_instance".to_string())
408            .or_default()
409            .insert(
410                vm_key.clone(),
411                json!({
412                    "name": vm_name,
413                    "project": project,
414                    "machine_type": self.machine_type,
415                    "zone": self.region,
416                    "tags": tags,
417                    "metadata": {
418                        "ssh-keys": format!("{user}:${{tls_private_key.vm_instance_ssh_key.public_key_openssh}}")
419                    },
420                    "boot_disk": [
421                        {
422                            "initialize_params": [
423                                {
424                                    "image": self.image
425                                }
426                            ]
427                        }
428                    ],
429                    "network_interface": external_interfaces,
430                }),
431            );
432
433        resource_batch.terraform.output.insert(
434            format!("{vm_key}-internal-ip"),
435            TerraformOutput {
436                value: format!(
437                    "${{google_compute_instance.{vm_key}.network_interface[0].network_ip}}"
438                ),
439            },
440        );
441    }
442
443    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
444        self.launched
445            .get()
446            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
447    }
448
449    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
450        self.launched
451            .get_or_init(|| {
452                let id = self.id;
453
454                let internal_ip = resource_result
455                    .terraform
456                    .outputs
457                    .get(&format!("vm-instance-{id}-internal-ip"))
458                    .unwrap()
459                    .value
460                    .clone();
461
462                let external_ip = resource_result
463                    .terraform
464                    .outputs
465                    .get(&format!("vm-instance-{id}-public-ip"))
466                    .map(|v| v.value.clone());
467
468                Arc::new(LaunchedComputeEngine {
469                    resource_result: resource_result.clone(),
470                    user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
471                    internal_ip,
472                    external_ip,
473                })
474            })
475            .clone()
476    }
477
478    fn strategy_as_server<'a>(
479        &'a self,
480        client_host: &dyn Host,
481        network_hint: PortNetworkHint,
482    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
483        if matches!(network_hint, PortNetworkHint::Auto)
484            && client_host.can_connect_to(ClientStrategy::UnixSocket(self.id))
485        {
486            Ok((
487                ClientStrategy::UnixSocket(self.id),
488                Box::new(|_| BaseServerStrategy::UnixSocket),
489            ))
490        } else if matches!(
491            network_hint,
492            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
493        ) && client_host.can_connect_to(ClientStrategy::InternalTcpPort(self))
494        {
495            Ok((
496                ClientStrategy::InternalTcpPort(self),
497                Box::new(move |_| {
498                    BaseServerStrategy::InternalTcpPort(match network_hint {
499                        PortNetworkHint::Auto => None,
500                        PortNetworkHint::TcpPort(port) => port,
501                    })
502                }),
503            ))
504        } else if matches!(network_hint, PortNetworkHint::Auto)
505            && client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self))
506        {
507            Ok((
508                ClientStrategy::ForwardedTcpPort(self),
509                Box::new(|me| {
510                    me.downcast_ref::<GcpComputeEngineHost>()
511                        .unwrap()
512                        .request_port_base(&BaseServerStrategy::ExternalTcpPort(22)); // needed to forward
513                    BaseServerStrategy::InternalTcpPort(None)
514                }),
515            ))
516        } else {
517            anyhow::bail!("Could not find a strategy to connect to GCP instance")
518        }
519    }
520
521    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
522        match typ {
523            ClientStrategy::UnixSocket(id) => {
524                #[cfg(unix)]
525                {
526                    self.id == id
527                }
528
529                #[cfg(not(unix))]
530                {
531                    let _ = id;
532                    false
533                }
534            }
535            ClientStrategy::InternalTcpPort(target_host) => {
536                if let Some(gcp_target) =
537                    <dyn Any>::downcast_ref::<GcpComputeEngineHost>(target_host)
538                {
539                    self.project == gcp_target.project
540                } else {
541                    false
542                }
543            }
544            ClientStrategy::ForwardedTcpPort(_) => false,
545        }
546    }
547}