hydro_deploy/
progress.rs

1use std::sync::{Arc, Mutex, OnceLock};
2use std::time::Duration;
3
4use futures::Future;
5use indicatif::MultiProgress;
6
7static PROGRESS_TRACKER: OnceLock<Mutex<ProgressTracker>> = OnceLock::new();
8
9tokio::task_local! {
10    static CURRENT_GROUP: Vec<usize>;
11}
12
13#[derive(Clone, PartialEq, Eq, Debug)]
14pub enum LeafStatus {
15    Started,
16    Finished,
17}
18
19#[derive(Debug)]
20pub enum BarTree {
21    Root(Vec<BarTree>),
22    Group(
23        String,
24        Arc<indicatif::ProgressBar>,
25        Vec<BarTree>,
26        Option<usize>,
27    ),
28    Leaf(String, Arc<indicatif::ProgressBar>, LeafStatus),
29    Finished,
30}
31
32impl BarTree {
33    fn get_pb(&self) -> Option<&Arc<indicatif::ProgressBar>> {
34        match self {
35            BarTree::Root(_) => None,
36            BarTree::Group(_, pb, _, _) | BarTree::Leaf(_, pb, _) => Some(pb),
37            BarTree::Finished => None,
38        }
39    }
40
41    fn status(&self) -> LeafStatus {
42        match self {
43            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
44                if !children.is_empty()
45                    && children
46                        .iter()
47                        .all(|child| child.status() == LeafStatus::Finished)
48                {
49                    LeafStatus::Finished
50                } else {
51                    LeafStatus::Started
52                }
53            }
54            BarTree::Leaf(_, _, status) => status.clone(),
55            BarTree::Finished => LeafStatus::Finished,
56        }
57    }
58
59    fn refresh_prefix(&mut self, cur_path: &[String]) {
60        match self {
61            BarTree::Root(children) => {
62                for child in children {
63                    child.refresh_prefix(cur_path);
64                }
65            }
66            BarTree::Group(name, pb, children, anticipated_total) => {
67                let finished_count = children
68                    .iter()
69                    .filter(|child| child.status() == LeafStatus::Finished)
70                    .count();
71                let started_count = children
72                    .iter()
73                    .filter(|child| child.status() == LeafStatus::Started)
74                    .count();
75                let queued_count =
76                    anticipated_total.map(|total| total - finished_count - started_count);
77
78                let progress_str =
79                    if anticipated_total.iter().any(|v| *v == 1) && started_count == 1 {
80                        "".to_string()
81                    } else {
82                        match queued_count {
83                            Some(queued_count) => {
84                                format!(
85                                    " ({}/{}/{})",
86                                    finished_count,
87                                    started_count,
88                                    queued_count + finished_count + started_count
89                                )
90                            }
91                            None => format!(" ({}/{}/?)", finished_count, started_count),
92                        }
93                    };
94
95                if cur_path.is_empty() {
96                    pb.set_prefix(format!("{}{}", name, progress_str));
97                } else {
98                    pb.set_prefix(format!(
99                        "{} / {}{}",
100                        cur_path.join(" / "),
101                        name,
102                        progress_str,
103                    ));
104                }
105
106                let mut path_with_group = cur_path.to_vec();
107                let non_finished_count = children
108                    .iter()
109                    .filter(|child| child.status() != LeafStatus::Finished)
110                    .count();
111
112                if non_finished_count == 1 {
113                    path_with_group.push(format!("{}{}", name, progress_str));
114                } else {
115                    path_with_group.push(name.clone());
116                }
117
118                for child in children {
119                    child.refresh_prefix(&path_with_group);
120                }
121            }
122            BarTree::Leaf(name, pb, _) => {
123                let mut path_with_group = cur_path.to_vec();
124                path_with_group.push(name.clone());
125                pb.set_prefix(path_with_group.join(" / "));
126            }
127            BarTree::Finished => {}
128        }
129    }
130
131    fn find_node(&self, path: &[usize]) -> &BarTree {
132        if path.is_empty() {
133            return self;
134        }
135
136        match self {
137            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
138                children[path[0]].find_node(&path[1..])
139            }
140            _ => panic!(),
141        }
142    }
143
144    fn find_node_mut(&mut self, path: &[usize]) -> &mut BarTree {
145        if path.is_empty() {
146            return self;
147        }
148
149        match self {
150            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
151                children[path[0]].find_node_mut(&path[1..])
152            }
153            _ => panic!(),
154        }
155    }
156}
157
158pub struct ProgressTracker {
159    pub(crate) multi_progress: MultiProgress,
160    tree: BarTree,
161    pub(crate) current_count: usize,
162    progress_list: Vec<(Arc<indicatif::ProgressBar>, bool)>,
163}
164
165impl ProgressTracker {
166    pub(crate) fn new() -> ProgressTracker {
167        ProgressTracker {
168            multi_progress: MultiProgress::new(),
169            tree: BarTree::Root(vec![]),
170            current_count: 0,
171            progress_list: vec![],
172        }
173    }
174
175    pub fn start_task(
176        &mut self,
177        under_path: Vec<usize>,
178        name: String,
179        group: bool,
180        anticipated_total: Option<usize>,
181        progress: bool,
182    ) -> (usize, Arc<indicatif::ProgressBar>) {
183        let surrounding = self.tree.find_node(&under_path);
184        let (surrounding_children, surrounding_pb) = match surrounding {
185            BarTree::Root(children) => (children, None),
186            BarTree::Group(_, pb, children, _) => (children, Some(pb)),
187            _ => panic!(),
188        };
189
190        if let Some(surrounding_pb) = &surrounding_pb {
191            let non_finished_count = surrounding_children
192                .iter()
193                .filter(|child| child.status() != LeafStatus::Finished)
194                .count();
195            if non_finished_count == 0 {
196                self.multi_progress.remove(surrounding_pb.as_ref());
197                let surrounding_idx = self
198                    .progress_list
199                    .iter()
200                    .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb))
201                    .unwrap();
202                self.progress_list[surrounding_idx].1 = false;
203            } else if non_finished_count == 1 {
204                let self_idx = self
205                    .progress_list
206                    .iter()
207                    .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb))
208                    .unwrap();
209                let last_visible_before = self.progress_list[..self_idx]
210                    .iter()
211                    .rposition(|(_, visible)| *visible);
212                if let Some(last_visible_before) = last_visible_before {
213                    self.multi_progress.insert_after(
214                        &self.progress_list[last_visible_before].0,
215                        surrounding_pb.as_ref().clone(),
216                    );
217                } else {
218                    self.multi_progress
219                        .insert(0, surrounding_pb.as_ref().clone());
220                }
221
222                self.progress_list[self_idx].1 = true;
223            }
224        }
225
226        let surrounding = self.tree.find_node_mut(&under_path);
227        let (surrounding_children, surrounding_pb) = match surrounding {
228            BarTree::Root(children) => (children, None),
229            BarTree::Group(_, pb, children, _) => (children, Some(pb)),
230            _ => panic!(),
231        };
232
233        self.current_count += 1;
234
235        let core_bar = indicatif::ProgressBar::new(100);
236        let previous_bar = surrounding_children
237            .iter()
238            .rev()
239            .flat_map(|c| c.get_pb())
240            .next();
241
242        let index_to_insert = if let Some(previous_bar) = previous_bar {
243            let index_of_prev = self
244                .progress_list
245                .iter()
246                .position(|pb| Arc::ptr_eq(&pb.0, previous_bar))
247                .unwrap();
248            index_of_prev + 1
249        } else if let Some(group_pb) = surrounding_pb {
250            let index_of_group = self
251                .progress_list
252                .iter()
253                .position(|pb| Arc::ptr_eq(&pb.0, group_pb))
254                .unwrap();
255            index_of_group + 1
256        } else if !self.progress_list.is_empty() {
257            self.progress_list.len()
258        } else {
259            0
260        };
261
262        let last_visible = if !self.progress_list.is_empty() {
263            self.progress_list[..index_to_insert]
264                .iter()
265                .rposition(|(_, visible)| *visible)
266        } else {
267            None
268        };
269
270        let created_bar = if let Some(last_visible) = last_visible {
271            self.multi_progress
272                .insert_after(&self.progress_list[last_visible].0, core_bar)
273        } else {
274            self.multi_progress.insert(0, core_bar)
275        };
276
277        let pb = Arc::new(created_bar);
278        self.progress_list
279            .insert(index_to_insert, (pb.clone(), true));
280        if group {
281            surrounding_children.push(BarTree::Group(name, pb.clone(), vec![], anticipated_total));
282        } else {
283            surrounding_children.push(BarTree::Leaf(name, pb.clone(), LeafStatus::Started));
284        }
285
286        let inserted_index = surrounding_children.len() - 1;
287
288        if progress {
289            pb.set_style(
290                indicatif::ProgressStyle::default_bar()
291                    .template("{spinner:.green} {prefix} {wide_msg} {bar} ({elapsed} elapsed)")
292                    .unwrap(),
293            );
294        } else {
295            pb.set_style(
296                indicatif::ProgressStyle::default_bar()
297                    .template("{spinner:.green} {prefix} {wide_msg} ({elapsed} elapsed)")
298                    .unwrap(),
299            );
300        }
301        pb.enable_steady_tick(Duration::from_millis(100));
302
303        self.tree.refresh_prefix(&[]);
304        (inserted_index, pb)
305    }
306
307    pub fn end_task(&mut self, path: Vec<usize>) {
308        let parent = self.tree.find_node_mut(&path[0..path.len() - 1]);
309        match parent {
310            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
311                let removed = children[*path.last().unwrap()].get_pb().unwrap().clone();
312                children[*path.last().unwrap()] = BarTree::Finished;
313                self.multi_progress.remove(&removed);
314                self.progress_list
315                    .retain(|(pb, _)| !Arc::ptr_eq(pb, &removed));
316
317                let non_finished_count = children
318                    .iter()
319                    .filter(|child| child.status() != LeafStatus::Finished)
320                    .count();
321                if let BarTree::Group(_, pb, _, _) = parent {
322                    if non_finished_count == 1 {
323                        self.multi_progress.remove(pb.as_ref());
324                        self.progress_list
325                            .iter_mut()
326                            .find(|(pb2, _)| Arc::ptr_eq(pb2, pb))
327                            .unwrap()
328                            .1 = false;
329                    } else if non_finished_count == 0 {
330                        let self_idx = self
331                            .progress_list
332                            .iter()
333                            .position(|(pb2, _)| Arc::ptr_eq(pb2, pb))
334                            .unwrap();
335
336                        let last_visible_before = self.progress_list[..self_idx]
337                            .iter()
338                            .rposition(|(_, visible)| *visible);
339
340                        if let Some(last_visible_before) = last_visible_before {
341                            self.multi_progress.insert_after(
342                                &self.progress_list[last_visible_before].0,
343                                pb.as_ref().clone(),
344                            );
345                        } else {
346                            self.multi_progress.insert(0, pb.as_ref().clone());
347                        }
348
349                        self.progress_list[self_idx].1 = true;
350                    }
351                }
352            }
353
354            _ => panic!(),
355        };
356
357        self.tree.refresh_prefix(&[]);
358
359        self.current_count -= 1;
360        if self.current_count == 0 {
361            self.multi_progress.clear().unwrap();
362        }
363    }
364}
365
366impl ProgressTracker {
367    pub fn println(msg: impl AsRef<str>) {
368        let progress_bar = PROGRESS_TRACKER
369            .get_or_init(|| Mutex::new(ProgressTracker::new()))
370            .lock()
371            .unwrap();
372
373        progress_bar.multi_progress.suspend(|| {
374            println!("{}", msg.as_ref());
375        });
376    }
377
378    pub fn eprintln(msg: impl AsRef<str>) {
379        let progress_bar = PROGRESS_TRACKER
380            .get_or_init(|| Mutex::new(ProgressTracker::new()))
381            .lock()
382            .unwrap();
383
384        progress_bar.multi_progress.suspend(|| {
385            eprintln!("{}", msg.as_ref());
386        });
387    }
388
389    pub fn with_group<'a, T, F: Future<Output = T>>(
390        name: impl Into<String>,
391        anticipated_total: Option<usize>,
392        f: impl FnOnce() -> F + 'a,
393    ) -> impl Future<Output = T> + 'a {
394        let mut group = CURRENT_GROUP
395            .try_with(|cur| cur.clone())
396            .unwrap_or_default();
397
398        let (group_i, _) = {
399            let mut progress_bar = PROGRESS_TRACKER
400                .get_or_init(|| Mutex::new(ProgressTracker::new()))
401                .lock()
402                .unwrap();
403            progress_bar.start_task(group.clone(), name.into(), true, anticipated_total, false)
404        };
405
406        group.push(group_i);
407
408        CURRENT_GROUP.scope(group.clone(), async {
409            let out = f().await;
410            let mut progress_bar = PROGRESS_TRACKER
411                .get_or_init(|| Mutex::new(ProgressTracker::new()))
412                .lock()
413                .unwrap();
414            progress_bar.end_task(group);
415            out
416        })
417    }
418
419    pub fn leaf<T, F: Future<Output = T>>(
420        name: impl Into<String>,
421        f: F,
422    ) -> impl Future<Output = T> {
423        let mut group = CURRENT_GROUP
424            .try_with(|cur| cur.clone())
425            .unwrap_or_default();
426
427        let (leaf_i, _) = {
428            let mut progress_bar = PROGRESS_TRACKER
429                .get_or_init(|| Mutex::new(ProgressTracker::new()))
430                .lock()
431                .unwrap();
432            progress_bar.start_task(group.clone(), name.into(), false, None, false)
433        };
434
435        group.push(leaf_i);
436
437        async move {
438            let out = f.await;
439            let mut progress_bar = PROGRESS_TRACKER
440                .get_or_init(|| Mutex::new(ProgressTracker::new()))
441                .lock()
442                .unwrap();
443            progress_bar.end_task(group);
444            out
445        }
446    }
447
448    pub fn rich_leaf<'a, T, F: Future<Output = T>>(
449        name: impl Into<String>,
450        f: impl FnOnce(Box<dyn Fn(String) + Send + Sync>) -> F + 'a,
451    ) -> impl Future<Output = T> + 'a {
452        let mut group = CURRENT_GROUP
453            .try_with(|cur| cur.clone())
454            .unwrap_or_default();
455
456        let (leaf_i, bar) = {
457            let mut progress_bar = PROGRESS_TRACKER
458                .get_or_init(|| Mutex::new(ProgressTracker::new()))
459                .lock()
460                .unwrap();
461            progress_bar.start_task(group.clone(), name.into(), false, None, false)
462        };
463
464        group.push(leaf_i);
465
466        async move {
467            let my_bar = bar.clone();
468            let out = f(Box::new(move |msg| {
469                my_bar.set_message(msg);
470            }))
471            .await;
472            let mut progress_bar = PROGRESS_TRACKER
473                .get_or_init(|| Mutex::new(ProgressTracker::new()))
474                .lock()
475                .unwrap();
476            progress_bar.end_task(group);
477            out
478        }
479    }
480
481    pub fn progress_leaf<'a, T, F: Future<Output = T>>(
482        name: impl Into<String>,
483        f: impl FnOnce(Box<dyn Fn(u64) + Send + Sync>, Box<dyn Fn(String) + Send + Sync>) -> F + 'a,
484    ) -> impl Future<Output = T> + 'a {
485        let mut group = CURRENT_GROUP
486            .try_with(|cur| cur.clone())
487            .unwrap_or_default();
488
489        let (leaf_i, bar) = {
490            let mut progress_bar = PROGRESS_TRACKER
491                .get_or_init(|| Mutex::new(ProgressTracker::new()))
492                .lock()
493                .unwrap();
494            progress_bar.start_task(group.clone(), name.into(), false, None, true)
495        };
496
497        group.push(leaf_i);
498
499        async move {
500            let my_bar = bar.clone();
501            let my_bar_2 = bar.clone();
502            let out = f(
503                Box::new(move |progress| {
504                    my_bar.set_position(progress);
505                }),
506                Box::new(move |msg| {
507                    my_bar_2.set_message(msg);
508                }),
509            )
510            .await;
511            let mut progress_bar = PROGRESS_TRACKER
512                .get_or_init(|| Mutex::new(ProgressTracker::new()))
513                .lock()
514                .unwrap();
515            progress_bar.end_task(group);
516            out
517        }
518    }
519}