hydro_lang/runtime_support/
launch.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3#[cfg(feature = "runtime_measure")]
4use std::panic::AssertUnwindSafe;
5
6#[cfg(feature = "runtime_measure")]
7use dfir_rs::futures::FutureExt;
8use dfir_rs::scheduled::graph::Dfir;
9use futures::StreamExt;
10use futures::stream::FuturesUnordered;
11pub use hydro_deploy_integration::*;
12#[cfg(feature = "runtime_measure")]
13#[cfg(target_os = "linux")]
14use procfs::WithCurrentSystemInfo;
15use serde::de::DeserializeOwned;
16
17#[cfg(not(feature = "runtime_measure"))]
18pub async fn run(flow: Dfir<'_>) {
19    launch_flow(flow).await;
20}
21
22pub async fn run_containerized(flow: Dfir<'_>) {
23    launch_flow_containerized(flow).await;
24}
25
26#[cfg(feature = "runtime_measure")]
27pub async fn run(flow: Dfir<'_>) {
28    // Make sure to print CPU even if we crash
29    let res = AssertUnwindSafe(launch_flow(flow)).catch_unwind().await;
30
31    #[cfg(target_os = "linux")]
32    {
33        let me = procfs::process::Process::myself().unwrap();
34        let stat = me.stat().unwrap();
35        let sysinfo = procfs::current_system_info();
36
37        let start_time = stat.starttime().get().unwrap();
38        let curr_time = chrono::Local::now();
39        let elapsed_time = curr_time - start_time;
40
41        let seconds_spent = (stat.utime + stat.stime) as f32 / sysinfo.ticks_per_second() as f32;
42        let run_time = chrono::Duration::milliseconds((seconds_spent * 1000.0) as i64);
43
44        let percent_cpu_use =
45            run_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
46        let user_time = chrono::Duration::milliseconds(
47            (stat.utime as f32 / sysinfo.ticks_per_second() as f32 * 1000.0) as i64,
48        );
49        let user_cpu_use =
50            user_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
51        let system_time = chrono::Duration::milliseconds(
52            (stat.stime as f32 / sysinfo.ticks_per_second() as f32 * 1000.0) as i64,
53        );
54        let system_cpu_use =
55            system_time.num_milliseconds() as f32 / elapsed_time.num_milliseconds() as f32;
56        println!(
57            "{} Total {:.4}%, User {:.4}%, System {:.4}%",
58            option_env!("HYDRO_RUNTIME_MEASURE_CPU_PREFIX").unwrap_or("CPU:"),
59            percent_cpu_use,
60            user_cpu_use,
61            system_cpu_use
62        );
63    }
64
65    #[cfg(not(target_os = "linux"))]
66    {
67        // TODO(shadaj): can enable on next sysinfo release
68        // use sysinfo::{Pid, System};
69        // let system = System::new_all();
70        // let process = system.process(Pid::from_u32(std::process::id())).unwrap();
71        // let run_time = process.run_time() * 1000;
72        // let cpu_time = process.accumulated_cpu_time();
73        // let user_cpu_use = cpu_time.user() as f32 / run_time as f32;
74        let user_cpu_use = 100.0;
75
76        println!(
77            "{} Total {:.4}%, User {:.4}%, System {:.4}%",
78            option_env!("HYDRO_RUNTIME_MEASURE_CPU_PREFIX").unwrap_or("CPU:"),
79            user_cpu_use,
80            user_cpu_use,
81            0.0
82        );
83    }
84
85    res.unwrap();
86}
87
88#[macro_export]
89macro_rules! launch {
90    ($f:expr) => {
91        async {
92            let ports = $crate::runtime_support::launch::init_no_ack_start().await;
93            let flow = $f(&ports);
94
95            println!("ack start");
96
97            $crate::runtime_support::launch::launch_flow(flow).await
98        }
99    };
100}
101
102pub use crate::launch;
103
104pub async fn launch_flow(mut flow: Dfir<'_>) {
105    let stop = tokio::sync::oneshot::channel();
106    tokio::task::spawn_blocking(|| {
107        let mut line = String::new();
108        std::io::stdin().read_line(&mut line).unwrap();
109        if line.starts_with("stop") {
110            stop.0.send(()).unwrap();
111        } else {
112            eprintln!("Unexpected stdin input: {:?}", line);
113        }
114    });
115
116    let local_set = tokio::task::LocalSet::new();
117    let flow = local_set.run_until(flow.run());
118
119    tokio::select! {
120        _ = stop.1 => {},
121        _ = flow => {}
122    }
123}
124
125pub async fn launch_flow_containerized(mut flow: Dfir<'_>) {
126    let local_set = tokio::task::LocalSet::new();
127    local_set.run_until(flow.run()).await;
128}
129
130pub async fn init_no_ack_start<T: DeserializeOwned + Default>() -> DeployPorts<T> {
131    let mut input = String::new();
132    std::io::stdin().read_line(&mut input).unwrap();
133    let trimmed = input.trim();
134
135    let bind_config = serde_json::from_str::<InitConfig>(trimmed).unwrap();
136
137    // config telling other services how to connect to me
138    let mut bind_results: HashMap<String, ServerPort> = HashMap::new();
139    let mut binds = HashMap::new();
140    for (name, config) in bind_config.0 {
141        let bound = config.bind().await;
142        bind_results.insert(name.clone(), bound.server_port());
143        binds.insert(name.clone(), bound);
144    }
145
146    let bind_serialized = serde_json::to_string(&bind_results).unwrap();
147    println!("ready: {bind_serialized}");
148
149    let mut start_buf = String::new();
150    std::io::stdin().read_line(&mut start_buf).unwrap();
151    let connection_defns = if start_buf.starts_with("start: ") {
152        serde_json::from_str::<HashMap<String, ServerPort>>(
153            start_buf.trim_start_matches("start: ").trim(),
154        )
155        .unwrap()
156    } else {
157        panic!("expected start");
158    };
159
160    let (client_conns, server_conns) = futures::join!(
161        connection_defns
162            .into_iter()
163            .map(|(name, defn)| async move { (name, Connection::AsClient(defn.connect().await)) })
164            .collect::<FuturesUnordered<_>>()
165            .collect::<Vec<_>>(),
166        binds
167            .into_iter()
168            .map(
169                |(name, defn)| async move { (name, Connection::AsServer(accept_bound(defn).await)) }
170            )
171            .collect::<FuturesUnordered<_>>()
172            .collect::<Vec<_>>()
173    );
174
175    let all_connected = client_conns
176        .into_iter()
177        .chain(server_conns.into_iter())
178        .collect();
179
180    DeployPorts {
181        ports: RefCell::new(all_connected),
182        meta: bind_config
183            .1
184            .map(|b| serde_json::from_str(&b).unwrap())
185            .unwrap_or_default(),
186    }
187}
188
189pub async fn init<T: DeserializeOwned + Default>() -> DeployPorts<T> {
190    let ret = init_no_ack_start::<T>().await;
191
192    println!("ack start");
193
194    ret
195}