1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context as _, Result};
7use async_ssh2_russh::russh::client::{Config, Handler};
8use async_ssh2_russh::russh::{Disconnect, compression};
9use async_ssh2_russh::russh_sftp::protocol::{Status, StatusCode};
10use async_ssh2_russh::sftp::SftpError;
11use async_ssh2_russh::{AsyncChannel, AsyncSession, NoCheckHandler};
12use async_trait::async_trait;
13use hydro_deploy_integration::ServerBindConfig;
14use inferno::collapse::Collapse;
15use inferno::collapse::perf::Folder;
16use nanoid::nanoid;
17use tokio::fs::File;
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
19use tokio::net::TcpListener;
20use tokio::sync::{mpsc, oneshot};
21use tokio_stream::StreamExt;
22use tokio_stream::wrappers::LinesStream;
23use tokio_util::io::SyncIoBridge;
24
25use crate::progress::ProgressTracker;
26use crate::rust_crate::build::BuildOutput;
27use crate::rust_crate::flamegraph::handle_fold_data;
28use crate::rust_crate::tracing_options::TracingOptions;
29use crate::util::{PriorityBroadcast, async_retry, prioritized_broadcast};
30use crate::{BaseServerStrategy, LaunchedBinary, LaunchedHost, ResourceResult, TracingResults};
31
32const PERF_OUTFILE: &str = "__profile.perf.data";
33
34struct LaunchedSshBinary {
35 _resource_result: Arc<ResourceResult>,
36 session: Option<AsyncSession<NoCheckHandler>>,
40 channel: AsyncChannel,
41 stdin_sender: mpsc::UnboundedSender<String>,
42 stdout_broadcast: PriorityBroadcast,
43 stderr_broadcast: PriorityBroadcast,
44 tracing: Option<TracingOptions>,
45 tracing_results: Option<TracingResults>,
46}
47
48#[async_trait]
49impl LaunchedBinary for LaunchedSshBinary {
50 fn stdin(&self) -> mpsc::UnboundedSender<String> {
51 self.stdin_sender.clone()
52 }
53
54 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
55 self.stdout_broadcast.receive_priority()
56 }
57
58 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
59 self.stdout_broadcast.receive(None)
60 }
61
62 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
63 self.stderr_broadcast.receive(None)
64 }
65
66 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
67 self.stdout_broadcast.receive(Some(prefix))
68 }
69
70 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
71 self.stderr_broadcast.receive(Some(prefix))
72 }
73
74 fn tracing_results(&self) -> Option<&TracingResults> {
75 self.tracing_results.as_ref()
76 }
77
78 fn exit_code(&self) -> Option<i32> {
79 self.channel
81 .recv_exit_status()
82 .try_get()
83 .map(|&ec| ec as _)
84 .ok()
85 }
86
87 async fn wait(&mut self) -> Result<i32> {
88 self.channel.wait_close().await;
89 Ok(self
90 .channel
91 .recv_exit_status()
92 .try_get()
93 .map(|&ec| ec as _)?)
94 }
95
96 async fn stop(&mut self) -> Result<()> {
97 if !self.channel.is_closed() {
98 ProgressTracker::leaf("force stopping", async {
99 self.channel.eof().await?; self.channel.close().await?; self.channel.wait_close().await;
103 Result::<_>::Ok(())
104 })
105 .await?;
106 }
107
108 if let Some(tracing) = self.tracing.as_ref() {
110 let session = self.session.as_ref().unwrap();
111 if let Some(local_raw_perf) = tracing.perf_raw_outfile.as_ref() {
112 ProgressTracker::progress_leaf("downloading perf data", |progress, _| async move {
113 let sftp =
114 async_retry(&|| session.open_sftp(), 10, Duration::from_secs(1)).await?;
115
116 let mut remote_raw_perf = sftp.open(PERF_OUTFILE).await?;
117 let mut local_raw_perf = File::create(local_raw_perf).await?;
118
119 let total_size = remote_raw_perf.metadata().await?.size.unwrap();
120
121 use tokio::io::AsyncWriteExt;
122 let mut index = 0;
123 loop {
124 let mut buffer = [0; 16 * 1024];
125 let n = remote_raw_perf.read(&mut buffer).await?;
126 if n == 0 {
127 break;
128 }
129 local_raw_perf.write_all(&buffer[..n]).await?;
130 index += n;
131 progress(((index as f64 / total_size as f64) * 100.0) as u64);
132 }
133
134 Ok::<(), anyhow::Error>(())
135 })
136 .await?;
137 }
138
139 let script_channel = session.open_channel().await?;
140 let mut fold_er = Folder::from(tracing.fold_perf_options.clone().unwrap_or_default());
141
142 let fold_data = ProgressTracker::leaf("perf script & folding", async move {
143 let mut stderr_lines = script_channel.stderr().lines();
144 let stdout = script_channel.stdout();
145
146 let ((), fold_data, ()) = tokio::try_join!(
148 async move {
149 while let Ok(Some(s)) = stderr_lines.next_line().await {
151 ProgressTracker::eprintln(format!("[perf stderr] {s}"));
152 }
153 Result::<_>::Ok(())
154 },
155 async move {
156 tokio::task::spawn_blocking(move || {
158 let mut fold_data = Vec::new();
159 fold_er.collapse(
160 SyncIoBridge::new(BufReader::new(stdout)),
161 &mut fold_data,
162 )?;
163 Ok(fold_data)
164 })
165 .await?
166 },
167 async move {
168 script_channel
170 .exec(false, format!("perf script --symfs=/ -i {PERF_OUTFILE}"))
171 .await?;
172 Ok(())
173 },
174 )?;
175 Result::<_>::Ok(fold_data)
176 })
177 .await?;
178
179 self.tracing_results = Some(TracingResults {
180 folded_data: fold_data.clone(),
181 });
182
183 handle_fold_data(tracing, fold_data).await?;
184 };
185
186 Ok(())
187 }
188}
189
190impl Drop for LaunchedSshBinary {
191 fn drop(&mut self) {
192 if let Some(session) = self.session.take() {
193 tokio::task::block_in_place(|| {
194 tokio::runtime::Handle::current().block_on(session.disconnect(
195 Disconnect::ByApplication,
196 "",
197 "",
198 ))
199 })
200 .unwrap();
201 }
202 }
203}
204
205#[async_trait]
206pub trait LaunchedSshHost: Send + Sync {
207 fn get_internal_ip(&self) -> String;
208 fn get_external_ip(&self) -> Option<String>;
209 fn get_cloud_provider(&self) -> String;
210 fn resource_result(&self) -> &Arc<ResourceResult>;
211 fn ssh_user(&self) -> &str;
212
213 fn ssh_key_path(&self) -> PathBuf {
214 self.resource_result()
215 .terraform
216 .deployment_folder
217 .as_ref()
218 .unwrap()
219 .path()
220 .join(".ssh")
221 .join("vm_instance_ssh_key_pem")
222 }
223
224 async fn open_ssh_session(&self) -> Result<AsyncSession<NoCheckHandler>> {
225 let target_addr = SocketAddr::new(
226 self.get_external_ip()
227 .as_ref()
228 .context(
229 self.get_cloud_provider()
230 + " host must be configured with an external IP to launch binaries",
231 )?
232 .parse()
233 .unwrap(),
234 22,
235 );
236
237 let res = ProgressTracker::leaf(
238 format!(
239 "connecting to host @ {}",
240 self.get_external_ip().as_ref().unwrap()
241 ),
242 async_retry(
243 &|| async {
244 let mut config = Config::default();
245 config.preferred.compression = (&[
246 compression::ZLIB,
247 compression::ZLIB_LEGACY,
248 compression::NONE,
249 ])
250 .into();
251 AsyncSession::connect_publickey(
252 config,
253 target_addr,
254 self.ssh_user(),
255 self.ssh_key_path(),
256 )
257 .await
258 },
259 10,
260 Duration::from_secs(1),
261 ),
262 )
263 .await?;
264
265 Ok(res)
266 }
267}
268
269async fn create_channel<H>(session: &AsyncSession<H>) -> Result<AsyncChannel>
270where
271 H: 'static + Handler,
272{
273 async_retry(
274 &|| async {
275 Ok(tokio::time::timeout(Duration::from_secs(60), session.open_channel()).await??)
276 },
277 10,
278 Duration::from_secs(1),
279 )
280 .await
281}
282
283#[async_trait]
284impl<T: LaunchedSshHost> LaunchedHost for T {
285 fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
286 match bind_type {
287 BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
288 BaseServerStrategy::InternalTcpPort(hint) => {
289 ServerBindConfig::TcpPort(self.get_internal_ip().clone(), *hint)
290 }
291 BaseServerStrategy::ExternalTcpPort(_) => todo!(),
292 }
293 }
294
295 async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> {
296 let session = self.open_ssh_session().await?;
297
298 let sftp = async_retry(&|| session.open_sftp(), 10, Duration::from_secs(1)).await?;
299
300 let user = self.ssh_user();
301 let binary_path = format!("/home/{user}/hydro-{}", binary.unique_id());
303
304 if sftp.metadata(&binary_path).await.is_err() {
305 let random = nanoid!(8);
306 let temp_path = format!("/home/{user}/hydro-{random}");
307 let sftp = &sftp;
308
309 ProgressTracker::progress_leaf(
310 format!("uploading binary to {}", binary_path),
311 |set_progress, _| {
312 async move {
313 let mut created_file = sftp.create(&temp_path).await?;
314
315 let mut index = 0;
316 while index < binary.bin_data.len() {
317 let written = created_file
318 .write(
319 &binary.bin_data[index
320 ..std::cmp::min(index + 128 * 1024, binary.bin_data.len())],
321 )
322 .await?;
323 index += written;
324 set_progress(
325 ((index as f64 / binary.bin_data.len() as f64) * 100.0) as u64,
326 );
327 }
328 let mut orig_file_stat = sftp.metadata(&temp_path).await?;
329 orig_file_stat.permissions = Some(0o755); created_file.set_metadata(orig_file_stat).await?;
331 created_file.sync_all().await?;
332 drop(created_file);
333
334 match sftp.rename(&temp_path, binary_path).await {
335 Ok(_) => {}
336 Err(SftpError::Status(Status {
337 status_code: StatusCode::Failure, ..
339 })) => {
340 sftp.remove_file(temp_path).await?;
342 }
343 Err(e) => return Err(e.into()),
344 }
345
346 anyhow::Ok(())
347 }
348 },
349 )
350 .await?;
351 }
352 sftp.close().await?;
353
354 Ok(())
355 }
356
357 async fn launch_binary(
358 &self,
359 id: String,
360 binary: &BuildOutput,
361 args: &[String],
362 tracing: Option<TracingOptions>,
363 ) -> Result<Box<dyn LaunchedBinary>> {
364 let session = self.open_ssh_session().await?;
365
366 let user = self.ssh_user();
367 let binary_path = PathBuf::from(format!("/home/{user}/hydro-{}", binary.unique_id()));
368
369 let mut command = binary_path.to_str().unwrap().to_owned();
370 for arg in args {
371 command.push(' ');
372 command.push_str(&shell_escape::unix::escape(arg.into()))
373 }
374
375 if let Some(TracingOptions {
377 frequency,
378 setup_command,
379 ..
380 }) = tracing.clone()
381 {
382 let id_clone = id.clone();
383 ProgressTracker::leaf("install perf", async {
384 if let Some(setup_command) = setup_command {
386 let mut setup_channel = create_channel(&session).await?;
387 let (setup_stdout, setup_stderr) =
388 (setup_channel.stdout(), setup_channel.stderr());
389 setup_channel.exec(false, &*setup_command).await?;
390
391 let mut output_lines = LinesStream::new(setup_stdout.lines())
393 .merge(LinesStream::new(setup_stderr.lines()));
394 while let Some(line) = output_lines.next().await {
395 ProgressTracker::eprintln(format!(
396 "[{} install perf] {}",
397 id_clone,
398 line.unwrap()
399 ));
400 }
401
402 let exit_code = setup_channel.recv_exit_status().wait().await.copied();
403 setup_channel.wait_close().await;
404 if Some(0) != exit_code {
405 anyhow::bail!("Failed to install perf on remote host");
406 }
407 }
408 Ok(())
409 })
410 .await?;
411
412 command = format!(
415 "perf record -F {frequency} -e cycles:u --call-graph dwarf,65528 -o {PERF_OUTFILE} {command}",
416 );
417 }
418
419 let (channel, stdout, stderr) = ProgressTracker::leaf(
420 format!("launching binary {}", binary_path.display()),
421 async {
422 let channel = create_channel(&session).await?;
423 let (stdout, stderr) = (channel.stdout(), channel.stderr());
425 channel.exec(false, command).await?;
426 anyhow::Ok((channel, stdout, stderr))
427 },
428 )
429 .await?;
430
431 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
432 let mut stdin = channel.stdin();
433
434 tokio::spawn(async move {
435 while let Some(line) = stdin_receiver.recv().await {
436 if stdin.write_all(line.as_bytes()).await.is_err() {
437 break;
438 }
439 stdin.flush().await.unwrap();
440 }
441 });
442
443 let id_clone = id.clone();
444 let stdout_broadcast = prioritized_broadcast(LinesStream::new(stdout.lines()), move |s| {
445 ProgressTracker::println(format!("[{id_clone}] {s}"));
446 });
447 let stderr_broadcast = prioritized_broadcast(LinesStream::new(stderr.lines()), move |s| {
448 ProgressTracker::println(format!("[{id} stderr] {s}"));
449 });
450
451 Ok(Box::new(LaunchedSshBinary {
452 _resource_result: self.resource_result().clone(),
453 session: Some(session),
454 channel,
455 stdin_sender,
456 stdout_broadcast,
457 stderr_broadcast,
458 tracing,
459 tracing_results: None,
460 }))
461 }
462
463 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
464 let session = self.open_ssh_session().await?;
465
466 let local_port = TcpListener::bind("127.0.0.1:0").await?;
467 let local_addr = local_port.local_addr()?;
468
469 let internal_ip = addr.ip().to_string();
470 let port = addr.port();
471
472 tokio::spawn(async move {
473 #[expect(clippy::never_loop, reason = "tcp accept loop pattern")]
474 while let Ok((mut local_stream, _)) = local_port.accept().await {
475 let mut channel = session
476 .channel_open_direct_tcpip(internal_ip, port.into(), "127.0.0.1", 22)
477 .await
478 .unwrap()
479 .into_stream();
480 let _ = tokio::io::copy_bidirectional(&mut local_stream, &mut channel).await;
481 break;
482 }
485
486 ProgressTracker::println("[hydro] closing forwarded port");
487 });
488
489 Ok(local_addr)
490 }
491}