hydro_deploy/
gcp.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex, OnceLock};
3
4use anyhow::Result;
5use async_trait::async_trait;
6use nanoid::nanoid;
7use serde_json::json;
8use tokio::sync::RwLock;
9
10use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
11use super::{
12    ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult,
13    ServerStrategy,
14};
15use crate::HostStrategyGetter;
16use crate::ssh::LaunchedSshHost;
17
18pub struct LaunchedComputeEngine {
19    resource_result: Arc<ResourceResult>,
20    user: String,
21    pub internal_ip: String,
22    pub external_ip: Option<String>,
23}
24
25impl LaunchedSshHost for LaunchedComputeEngine {
26    fn get_external_ip(&self) -> Option<String> {
27        self.external_ip.clone()
28    }
29
30    fn get_internal_ip(&self) -> String {
31        self.internal_ip.clone()
32    }
33
34    fn get_cloud_provider(&self) -> String {
35        "GCP".to_string()
36    }
37
38    fn resource_result(&self) -> &Arc<ResourceResult> {
39        &self.resource_result
40    }
41
42    fn ssh_user(&self) -> &str {
43        self.user.as_str()
44    }
45}
46
47#[derive(Debug)]
48pub struct GcpNetwork {
49    pub project: String,
50    pub existing_vpc: Option<String>,
51    id: String,
52}
53
54impl GcpNetwork {
55    pub fn new(project: impl Into<String>, existing_vpc: Option<String>) -> Self {
56        Self {
57            project: project.into(),
58            existing_vpc,
59            id: nanoid!(8, &TERRAFORM_ALPHABET),
60        }
61    }
62
63    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
64        resource_batch
65            .terraform
66            .terraform
67            .required_providers
68            .insert(
69                "google".to_string(),
70                TerraformProvider {
71                    source: "hashicorp/google".to_string(),
72                    version: "4.53.1".to_string(),
73                },
74            );
75
76        let vpc_network = format!("hydro-vpc-network-{}", self.id);
77
78        if let Some(existing) = self.existing_vpc.as_ref() {
79            if resource_batch
80                .terraform
81                .resource
82                .get("google_compute_network")
83                .unwrap_or(&HashMap::new())
84                .contains_key(existing)
85            {
86                format!("google_compute_network.{existing}")
87            } else {
88                resource_batch
89                    .terraform
90                    .data
91                    .entry("google_compute_network".to_string())
92                    .or_default()
93                    .insert(
94                        vpc_network.clone(),
95                        json!({
96                            "name": existing,
97                            "project": self.project,
98                        }),
99                    );
100
101                format!("data.google_compute_network.{vpc_network}")
102            }
103        } else {
104            resource_batch
105                .terraform
106                .resource
107                .entry("google_compute_network".to_string())
108                .or_default()
109                .insert(
110                    vpc_network.clone(),
111                    json!({
112                        "name": vpc_network,
113                        "project": self.project,
114                        "auto_create_subnetworks": true
115                    }),
116                );
117
118            let firewall_entries = resource_batch
119                .terraform
120                .resource
121                .entry("google_compute_firewall".to_string())
122                .or_default();
123
124            // allow all VMs to communicate with each other over internal IPs
125            firewall_entries.insert(
126                format!("{vpc_network}-default-allow-internal"),
127                json!({
128                    "name": format!("{vpc_network}-default-allow-internal"),
129                    "project": self.project,
130                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
131                    "source_ranges": ["10.128.0.0/9"],
132                    "allow": [
133                        {
134                            "protocol": "tcp",
135                            "ports": ["0-65535"]
136                        },
137                        {
138                            "protocol": "udp",
139                            "ports": ["0-65535"]
140                        },
141                        {
142                            "protocol": "icmp"
143                        }
144                    ]
145                }),
146            );
147
148            // allow external pings to all VMs
149            firewall_entries.insert(
150                format!("{vpc_network}-default-allow-ping"),
151                json!({
152                    "name": format!("{vpc_network}-default-allow-ping"),
153                    "project": self.project,
154                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
155                    "source_ranges": ["0.0.0.0/0"],
156                    "allow": [
157                        {
158                            "protocol": "icmp"
159                        }
160                    ]
161                }),
162            );
163
164            self.existing_vpc = Some(vpc_network.clone());
165
166            format!("google_compute_network.{vpc_network}")
167        }
168    }
169}
170
171pub struct GcpComputeEngineHost {
172    /// ID from [`crate::Deployment::add_host`].
173    id: usize,
174
175    project: String,
176    machine_type: String,
177    image: String,
178    region: String,
179    network: Arc<RwLock<GcpNetwork>>,
180    user: Option<String>,
181    startup_script: Option<String>,
182    pub launched: OnceLock<Arc<LaunchedComputeEngine>>, // TODO(mingwei): fix pub
183    external_ports: Mutex<Vec<u16>>,
184}
185
186impl GcpComputeEngineHost {
187    #[expect(
188        clippy::too_many_arguments,
189        reason = "internal code called by builder elsewhere"
190    )]
191    pub fn new(
192        id: usize,
193        project: impl Into<String>,
194        machine_type: impl Into<String>,
195        image: impl Into<String>,
196        region: impl Into<String>,
197        network: Arc<RwLock<GcpNetwork>>,
198        user: Option<String>,
199        startup_script: Option<String>,
200    ) -> Self {
201        Self {
202            id,
203            project: project.into(),
204            machine_type: machine_type.into(),
205            image: image.into(),
206            region: region.into(),
207            network,
208            user,
209            startup_script,
210            launched: OnceLock::new(),
211            external_ports: Mutex::new(Vec::new()),
212        }
213    }
214}
215
216#[async_trait]
217impl Host for GcpComputeEngineHost {
218    fn target_type(&self) -> HostTargetType {
219        HostTargetType::Linux
220    }
221
222    fn request_port(&self, bind_type: &ServerStrategy) {
223        match bind_type {
224            ServerStrategy::UnixSocket => {}
225            ServerStrategy::InternalTcpPort => {}
226            ServerStrategy::ExternalTcpPort(port) => {
227                let mut external_ports = self.external_ports.lock().unwrap();
228                if !external_ports.contains(port) {
229                    if self.launched.get().is_some() {
230                        todo!("Cannot adjust firewall after host has been launched");
231                    }
232                    external_ports.push(*port);
233                }
234            }
235            ServerStrategy::Demux(demux) => {
236                for bind_type in demux.values() {
237                    self.request_port(bind_type);
238                }
239            }
240            ServerStrategy::Merge(merge) => {
241                for bind_type in merge {
242                    self.request_port(bind_type);
243                }
244            }
245            ServerStrategy::Tagged(underlying, _) => {
246                self.request_port(underlying);
247            }
248            ServerStrategy::Null => {}
249        }
250    }
251
252    fn request_custom_binary(&self) {
253        self.request_port(&ServerStrategy::ExternalTcpPort(22));
254    }
255
256    fn id(&self) -> usize {
257        self.id
258    }
259
260    fn as_any(&self) -> &dyn std::any::Any {
261        self
262    }
263
264    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
265        if self.launched.get().is_some() {
266            return;
267        }
268
269        let vpc_path = self
270            .network
271            .try_write()
272            .unwrap()
273            .collect_resources(resource_batch);
274
275        let project = self.project.as_str();
276
277        // first, we import the providers we need
278        resource_batch
279            .terraform
280            .terraform
281            .required_providers
282            .insert(
283                "google".to_string(),
284                TerraformProvider {
285                    source: "hashicorp/google".to_string(),
286                    version: "4.53.1".to_string(),
287                },
288            );
289
290        resource_batch
291            .terraform
292            .terraform
293            .required_providers
294            .insert(
295                "local".to_string(),
296                TerraformProvider {
297                    source: "hashicorp/local".to_string(),
298                    version: "2.3.0".to_string(),
299                },
300            );
301
302        resource_batch
303            .terraform
304            .terraform
305            .required_providers
306            .insert(
307                "tls".to_string(),
308                TerraformProvider {
309                    source: "hashicorp/tls".to_string(),
310                    version: "4.0.4".to_string(),
311                },
312            );
313
314        // we use a single SSH key for all VMs
315        resource_batch
316            .terraform
317            .resource
318            .entry("tls_private_key".to_string())
319            .or_default()
320            .insert(
321                "vm_instance_ssh_key".to_string(),
322                json!({
323                    "algorithm": "RSA",
324                    "rsa_bits": 4096
325                }),
326            );
327
328        resource_batch
329            .terraform
330            .resource
331            .entry("local_file".to_string())
332            .or_default()
333            .insert(
334                "vm_instance_ssh_key_pem".to_string(),
335                json!({
336                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
337                    "filename": ".ssh/vm_instance_ssh_key_pem",
338                    "file_permission": "0600"
339                }),
340            );
341
342        let vm_key = format!("vm-instance-{}", self.id);
343        let vm_name = format!("hydro-vm-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET));
344
345        let mut tags = vec![];
346        let mut external_interfaces = vec![];
347
348        let external_ports = self.external_ports.lock().unwrap();
349        if external_ports.is_empty() {
350            external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") }));
351        } else {
352            external_interfaces.push(json!({
353                "network": format!("${{{vpc_path}.self_link}}"),
354                "access_config": [
355                    {
356                        "network_tier": "STANDARD"
357                    }
358                ]
359            }));
360
361            // open the external ports that were requested
362            let my_external_tags = external_ports.iter().map(|port| {
363                let rule_id = nanoid!(8, &TERRAFORM_ALPHABET);
364                let firewall_rule = resource_batch
365                    .terraform
366                    .resource
367                    .entry("google_compute_firewall".to_string())
368                    .or_default()
369                    .entry(format!("open-external-port-{}", port))
370                    .or_insert(json!({
371                        "name": format!("open-external-port-{}-{}", port, rule_id),
372                        "project": project,
373                        "network": format!("${{{vpc_path}.name}}"),
374                        "target_tags": [format!("open-external-port-tag-{}-{}", port, rule_id)],
375                        "source_ranges": ["0.0.0.0/0"],
376                        "allow": [
377                            {
378                                "protocol": "tcp",
379                                "ports": vec![port.to_string()]
380                            }
381                        ]
382                    }));
383
384                firewall_rule["target_tags"].as_array().unwrap()[0].clone()
385            });
386
387            tags.extend(my_external_tags);
388
389            resource_batch.terraform.output.insert(
390                format!("{vm_key}-public-ip"),
391                TerraformOutput {
392                    value: format!("${{google_compute_instance.{vm_key}.network_interface[0].access_config[0].nat_ip}}")
393                }
394            );
395        }
396        drop(external_ports); // Drop the lock as soon as possible.
397
398        let user = self.user.as_ref().cloned().unwrap_or("hydro".to_string());
399        resource_batch
400            .terraform
401            .resource
402            .entry("google_compute_instance".to_string())
403            .or_default()
404            .insert(
405                vm_key.clone(),
406                json!({
407                    "name": vm_name,
408                    "project": project,
409                    "machine_type": self.machine_type,
410                    "zone": self.region,
411                    "tags": tags,
412                    "metadata": {
413                        "ssh-keys": format!("{user}:${{tls_private_key.vm_instance_ssh_key.public_key_openssh}}")
414                    },
415                    "boot_disk": [
416                        {
417                            "initialize_params": [
418                                {
419                                    "image": self.image
420                                }
421                            ]
422                        }
423                    ],
424                    "network_interface": external_interfaces,
425                    "metadata_startup_script": self.startup_script,
426                }),
427            );
428
429        resource_batch.terraform.output.insert(
430            format!("{vm_key}-internal-ip"),
431            TerraformOutput {
432                value: format!(
433                    "${{google_compute_instance.{vm_key}.network_interface[0].network_ip}}"
434                ),
435            },
436        );
437    }
438
439    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
440        self.launched
441            .get()
442            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
443    }
444
445    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
446        self.launched
447            .get_or_init(|| {
448                let id = self.id;
449
450                let internal_ip = resource_result
451                    .terraform
452                    .outputs
453                    .get(&format!("vm-instance-{id}-internal-ip"))
454                    .unwrap()
455                    .value
456                    .clone();
457
458                let external_ip = resource_result
459                    .terraform
460                    .outputs
461                    .get(&format!("vm-instance-{id}-public-ip"))
462                    .map(|v| v.value.clone());
463
464                Arc::new(LaunchedComputeEngine {
465                    resource_result: resource_result.clone(),
466                    user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
467                    internal_ip,
468                    external_ip,
469                })
470            })
471            .clone()
472    }
473
474    fn strategy_as_server<'a>(
475        &'a self,
476        client_host: &dyn Host,
477    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
478        if client_host.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
479            Ok((
480                ClientStrategy::UnixSocket(self.id),
481                Box::new(|_| ServerStrategy::UnixSocket),
482            ))
483        } else if client_host.can_connect_to(ClientStrategy::InternalTcpPort(self)) {
484            Ok((
485                ClientStrategy::InternalTcpPort(self),
486                Box::new(|_| ServerStrategy::InternalTcpPort),
487            ))
488        } else if client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self)) {
489            Ok((
490                ClientStrategy::ForwardedTcpPort(self),
491                Box::new(|me| {
492                    me.downcast_ref::<GcpComputeEngineHost>()
493                        .unwrap()
494                        .request_port(&ServerStrategy::ExternalTcpPort(22)); // needed to forward
495                    ServerStrategy::InternalTcpPort
496                }),
497            ))
498        } else {
499            anyhow::bail!("Could not find a strategy to connect to GCP instance")
500        }
501    }
502
503    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
504        match typ {
505            ClientStrategy::UnixSocket(id) => {
506                #[cfg(unix)]
507                {
508                    self.id == id
509                }
510
511                #[cfg(not(unix))]
512                {
513                    let _ = id;
514                    false
515                }
516            }
517            ClientStrategy::InternalTcpPort(target_host) => {
518                if let Some(gcp_target) =
519                    target_host.as_any().downcast_ref::<GcpComputeEngineHost>()
520                {
521                    self.project == gcp_target.project
522                } else {
523                    false
524                }
525            }
526            ClientStrategy::ForwardedTcpPort(_) => false,
527        }
528    }
529}