hydro_lang/runtime_support/
launch.rs1use std::cell::RefCell;
2use std::collections::HashMap;
3#[cfg(feature = "runtime_measure")]
4use std::panic::AssertUnwindSafe;
5
6#[cfg(feature = "runtime_measure")]
7use dfir_rs::futures::FutureExt;
8use dfir_rs::scheduled::graph::Dfir;
9use futures::StreamExt;
10use futures::stream::FuturesUnordered;
11pub use hydro_deploy_integration::*;
12#[cfg(feature = "runtime_measure")]
13#[cfg(target_os = "linux")]
14use procfs::WithCurrentSystemInfo;
15use serde::de::DeserializeOwned;
16
17#[cfg(not(feature = "runtime_measure"))]
18pub async fn run(flow: Dfir<'_>) {
19 launch_flow(flow).await;
20}
21
22pub async fn run_containerized(flow: Dfir<'_>) {
23 launch_flow_containerized(flow).await;
24}
25
26#[cfg(feature = "runtime_measure")]
27pub async fn run(flow: Dfir<'_>) {
28 let res = AssertUnwindSafe(launch_flow(flow)).catch_unwind().await;
30
31 #[cfg(target_os = "linux")]
32 {
33 let me = procfs::process::Process::myself().unwrap();
34 let stat = me.stat().unwrap();
35 let sysinfo = procfs::current_system_info();
36
37 let start_time = stat.starttime().get().unwrap();
38 let curr_time = chrono::Local::now();
39 let elapsed_time = curr_time - start_time;
40
41 let seconds_spent = (stat.utime + stat.stime) as f32 / sysinfo.ticks_per_second() as f32;
42 let run_time = chrono::Duration::milliseconds((seconds_spent * 1000.0) as i64);
43
44 let percent_cpu_use =
45 run_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
46 let user_time = chrono::Duration::milliseconds(
47 (stat.utime as f32 / sysinfo.ticks_per_second() as f32 * 1000.0) as i64,
48 );
49 let user_cpu_use =
50 user_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
51 let system_time = chrono::Duration::milliseconds(
52 (stat.stime as f32 / sysinfo.ticks_per_second() as f32 * 1000.0) as i64,
53 );
54 let system_cpu_use =
55 system_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
56 println!(
57 "{} Total {:.4}%, User {:.4}%, System {:.4}%",
58 option_env!("HYDRO_RUNTIME_MEASURE_CPU_PREFIX").unwrap_or("CPU:"),
59 percent_cpu_use,
60 user_cpu_use,
61 system_cpu_use
62 );
63 }
64
65 #[cfg(not(target_os = "linux"))]
66 {
67 let user_cpu_use = 100.0;
75
76 println!(
77 "{} Total {:.4}%, User {:.4}%, System {:.4}%",
78 option_env!("HYDRO_RUNTIME_MEASURE_CPU_PREFIX").unwrap_or("CPU:"),
79 user_cpu_use,
80 user_cpu_use,
81 0.0
82 );
83 }
84
85 res.unwrap();
86}
87
88#[macro_export]
89macro_rules! launch {
90 ($f:expr) => {
91 async {
92 let ports = $crate::runtime_support::launch::init_no_ack_start().await;
93 let flow = $f(&ports);
94
95 println!("ack start");
96
97 $crate::runtime_support::launch::launch_flow(flow).await
98 }
99 };
100}
101
102pub use crate::launch;
103
104pub async fn launch_flow(mut flow: Dfir<'_>) {
105 let stop = tokio::sync::oneshot::channel();
106 tokio::task::spawn_blocking(|| {
107 let mut line = String::new();
108 std::io::stdin().read_line(&mut line).unwrap();
109 if line.starts_with("stop") {
110 stop.0.send(()).unwrap();
111 } else {
112 eprintln!("Unexpected stdin input: {:?}", line);
113 }
114 });
115
116 let local_set = tokio::task::LocalSet::new();
117 let flow = local_set.run_until(flow.run());
118
119 tokio::select! {
120 _ = stop.1 => {},
121 _ = flow => {}
122 }
123}
124
125pub async fn launch_flow_containerized(mut flow: Dfir<'_>) {
126 let local_set = tokio::task::LocalSet::new();
127 local_set.run_until(flow.run()).await;
128}
129
130pub async fn init_no_ack_start<T: DeserializeOwned + Default>() -> DeployPorts<T> {
131 let mut input = String::new();
132 std::io::stdin().read_line(&mut input).unwrap();
133 let trimmed = input.trim();
134
135 let bind_config = serde_json::from_str::<InitConfig>(trimmed).unwrap();
136
137 let mut bind_results: HashMap<String, ServerPort> = HashMap::new();
139 let mut binds = HashMap::new();
140 for (name, config) in bind_config.0 {
141 let bound = config.bind().await;
142 bind_results.insert(name.clone(), bound.server_port());
143 binds.insert(name.clone(), bound);
144 }
145
146 let bind_serialized = serde_json::to_string(&bind_results).unwrap();
147 println!("ready: {bind_serialized}");
148
149 let mut start_buf = String::new();
150 std::io::stdin().read_line(&mut start_buf).unwrap();
151 let connection_defns = if start_buf.starts_with("start: ") {
152 serde_json::from_str::<HashMap<String, ServerPort>>(
153 start_buf.trim_start_matches("start: ").trim(),
154 )
155 .unwrap()
156 } else {
157 panic!("expected start");
158 };
159
160 let (client_conns, server_conns) = futures::join!(
161 connection_defns
162 .into_iter()
163 .map(|(name, defn)| async move { (name, Connection::AsClient(defn.connect().await)) })
164 .collect::<FuturesUnordered<_>>()
165 .collect::<Vec<_>>(),
166 binds
167 .into_iter()
168 .map(
169 |(name, defn)| async move { (name, Connection::AsServer(accept_bound(defn).await)) }
170 )
171 .collect::<FuturesUnordered<_>>()
172 .collect::<Vec<_>>()
173 );
174
175 let all_connected = client_conns
176 .into_iter()
177 .chain(server_conns.into_iter())
178 .collect();
179
180 DeployPorts {
181 ports: RefCell::new(all_connected),
182 meta: bind_config
183 .1
184 .map(|b| serde_json::from_str(&b).unwrap())
185 .unwrap_or_default(),
186 }
187}
188
189pub async fn init<T: DeserializeOwned + Default>() -> DeployPorts<T> {
190 let ret = init_no_ack_start::<T>().await;
191
192 println!("ack start");
193
194 ret
195}