hydro_deploy/
progress.rs

1use std::sync::{Arc, Mutex, OnceLock};
2use std::time::Duration;
3
4use futures::Future;
5use indicatif::MultiProgress;
6
7static PROGRESS_TRACKER: OnceLock<Mutex<ProgressTracker>> = OnceLock::new();
8
9tokio::task_local! {
10    static CURRENT_GROUP: Vec<usize>;
11}
12
13#[derive(Clone, PartialEq, Eq, Debug)]
14pub enum LeafStatus {
15    Started,
16    Finished,
17}
18
19#[derive(Debug)]
20pub enum BarTree {
21    Root(Vec<BarTree>),
22    Group(
23        String,
24        Arc<indicatif::ProgressBar>,
25        Vec<BarTree>,
26        Option<usize>,
27    ),
28    Leaf(String, Arc<indicatif::ProgressBar>, LeafStatus),
29    Finished,
30}
31
32impl BarTree {
33    fn get_pb(&self) -> Option<&Arc<indicatif::ProgressBar>> {
34        match self {
35            BarTree::Root(_) => None,
36            BarTree::Group(_, pb, _, _) | BarTree::Leaf(_, pb, _) => Some(pb),
37            BarTree::Finished => None,
38        }
39    }
40
41    fn status(&self) -> LeafStatus {
42        match self {
43            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
44                if !children.is_empty()
45                    && children
46                        .iter()
47                        .all(|child| child.status() == LeafStatus::Finished)
48                {
49                    LeafStatus::Finished
50                } else {
51                    LeafStatus::Started
52                }
53            }
54            BarTree::Leaf(_, _, status) => status.clone(),
55            BarTree::Finished => LeafStatus::Finished,
56        }
57    }
58
59    fn refresh_prefix(&mut self, cur_path: &[String]) {
60        match self {
61            BarTree::Root(children) => {
62                for child in children {
63                    child.refresh_prefix(cur_path);
64                }
65            }
66            BarTree::Group(name, pb, children, anticipated_total) => {
67                let finished_count = children
68                    .iter()
69                    .filter(|child| child.status() == LeafStatus::Finished)
70                    .count();
71                let started_count = children
72                    .iter()
73                    .filter(|child| child.status() == LeafStatus::Started)
74                    .count();
75
76                let progress_str = if Some(1) == *anticipated_total {
77                    String::new()
78                } else {
79                    let started_or_finished_count = finished_count + started_count;
80                    format!(
81                        " ({}/{}/{})",
82                        finished_count,
83                        started_or_finished_count,
84                        anticipated_total
85                            .filter(|&total| started_or_finished_count <= total)
86                            .map_or_else(|| "?".to_string(), |total| total.to_string()),
87                    )
88                };
89
90                if cur_path.is_empty() {
91                    pb.set_prefix(format!("{}{}", name, progress_str));
92                } else {
93                    pb.set_prefix(format!(
94                        "{} / {}{}",
95                        cur_path.join(" / "),
96                        name,
97                        progress_str,
98                    ));
99                }
100
101                let mut path_with_group = cur_path.to_vec();
102                let non_finished_count = children
103                    .iter()
104                    .filter(|child| child.status() != LeafStatus::Finished)
105                    .count();
106
107                if non_finished_count == 1 {
108                    path_with_group.push(format!("{}{}", name, progress_str));
109                } else {
110                    path_with_group.push(name.clone());
111                }
112
113                for child in children {
114                    child.refresh_prefix(&path_with_group);
115                }
116            }
117            BarTree::Leaf(name, pb, _) => {
118                let mut path_with_group = cur_path.to_vec();
119                path_with_group.push(name.clone());
120                pb.set_prefix(path_with_group.join(" / "));
121            }
122            BarTree::Finished => {}
123        }
124    }
125
126    fn find_node(&self, path: &[usize]) -> &BarTree {
127        if path.is_empty() {
128            return self;
129        }
130
131        match self {
132            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
133                children[path[0]].find_node(&path[1..])
134            }
135            _ => panic!(),
136        }
137    }
138
139    fn find_node_mut(&mut self, path: &[usize]) -> &mut BarTree {
140        if path.is_empty() {
141            return self;
142        }
143
144        match self {
145            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
146                children[path[0]].find_node_mut(&path[1..])
147            }
148            _ => panic!(),
149        }
150    }
151}
152
153pub struct ProgressTracker {
154    pub(crate) multi_progress: MultiProgress,
155    tree: BarTree,
156    pub(crate) current_count: usize,
157    progress_list: Vec<(Arc<indicatif::ProgressBar>, bool)>,
158}
159
160impl ProgressTracker {
161    pub(crate) fn new() -> ProgressTracker {
162        ProgressTracker {
163            multi_progress: MultiProgress::new(),
164            tree: BarTree::Root(vec![]),
165            current_count: 0,
166            progress_list: vec![],
167        }
168    }
169
170    pub fn start_task(
171        &mut self,
172        under_path: Vec<usize>,
173        name: String,
174        group: bool,
175        anticipated_total: Option<usize>,
176        progress: bool,
177    ) -> (usize, Arc<indicatif::ProgressBar>) {
178        let surrounding = self.tree.find_node(&under_path);
179        let (surrounding_children, surrounding_pb) = match surrounding {
180            BarTree::Root(children) => (children, None),
181            BarTree::Group(_, pb, children, _) => (children, Some(pb)),
182            _ => panic!(),
183        };
184
185        if let Some(surrounding_pb) = &surrounding_pb {
186            let non_finished_count = surrounding_children
187                .iter()
188                .filter(|child| child.status() != LeafStatus::Finished)
189                .count();
190            if non_finished_count == 0 {
191                self.multi_progress.remove(surrounding_pb.as_ref());
192                let surrounding_idx = self
193                    .progress_list
194                    .iter()
195                    .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb))
196                    .unwrap();
197                self.progress_list[surrounding_idx].1 = false;
198            } else if non_finished_count == 1 {
199                let self_idx = self
200                    .progress_list
201                    .iter()
202                    .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb))
203                    .unwrap();
204                let last_visible_before = self.progress_list[..self_idx]
205                    .iter()
206                    .rposition(|(_, visible)| *visible);
207                if let Some(last_visible_before) = last_visible_before {
208                    self.multi_progress.insert_after(
209                        &self.progress_list[last_visible_before].0,
210                        surrounding_pb.as_ref().clone(),
211                    );
212                } else {
213                    self.multi_progress
214                        .insert(0, surrounding_pb.as_ref().clone());
215                }
216
217                self.progress_list[self_idx].1 = true;
218            }
219        }
220
221        let surrounding = self.tree.find_node_mut(&under_path);
222        let (surrounding_children, surrounding_pb) = match surrounding {
223            BarTree::Root(children) => (children, None),
224            BarTree::Group(_, pb, children, _) => (children, Some(pb)),
225            _ => panic!(),
226        };
227
228        self.current_count += 1;
229
230        let core_bar = indicatif::ProgressBar::new(100);
231        let previous_bar = surrounding_children
232            .iter()
233            .rev()
234            .flat_map(|c| c.get_pb())
235            .next();
236
237        let index_to_insert = if let Some(previous_bar) = previous_bar {
238            let index_of_prev = self
239                .progress_list
240                .iter()
241                .position(|pb| Arc::ptr_eq(&pb.0, previous_bar))
242                .unwrap();
243            index_of_prev + 1
244        } else if let Some(group_pb) = surrounding_pb {
245            let index_of_group = self
246                .progress_list
247                .iter()
248                .position(|pb| Arc::ptr_eq(&pb.0, group_pb))
249                .unwrap();
250            index_of_group + 1
251        } else if !self.progress_list.is_empty() {
252            self.progress_list.len()
253        } else {
254            0
255        };
256
257        let last_visible = if !self.progress_list.is_empty() {
258            self.progress_list[..index_to_insert]
259                .iter()
260                .rposition(|(_, visible)| *visible)
261        } else {
262            None
263        };
264
265        let created_bar = if let Some(last_visible) = last_visible {
266            self.multi_progress
267                .insert_after(&self.progress_list[last_visible].0, core_bar)
268        } else {
269            self.multi_progress.insert(0, core_bar)
270        };
271
272        let pb = Arc::new(created_bar);
273        self.progress_list
274            .insert(index_to_insert, (pb.clone(), true));
275        if group {
276            surrounding_children.push(BarTree::Group(name, pb.clone(), vec![], anticipated_total));
277        } else {
278            surrounding_children.push(BarTree::Leaf(name, pb.clone(), LeafStatus::Started));
279        }
280
281        let inserted_index = surrounding_children.len() - 1;
282
283        if progress {
284            pb.set_style(
285                indicatif::ProgressStyle::default_bar()
286                    .template("{spinner:.green} {prefix} {wide_msg} {bar} ({elapsed} elapsed)")
287                    .unwrap(),
288            );
289        } else {
290            pb.set_style(
291                indicatif::ProgressStyle::default_bar()
292                    .template("{spinner:.green} {prefix} {wide_msg} ({elapsed} elapsed)")
293                    .unwrap(),
294            );
295        }
296        pb.enable_steady_tick(Duration::from_millis(100));
297
298        self.tree.refresh_prefix(&[]);
299        (inserted_index, pb)
300    }
301
302    pub fn end_task(&mut self, path: Vec<usize>) {
303        let parent = self.tree.find_node_mut(&path[0..path.len() - 1]);
304        match parent {
305            BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
306                let removed = children[*path.last().unwrap()].get_pb().unwrap().clone();
307                children[*path.last().unwrap()] = BarTree::Finished;
308                self.multi_progress.remove(&removed);
309                self.progress_list
310                    .retain(|(pb, _)| !Arc::ptr_eq(pb, &removed));
311
312                let non_finished_count = children
313                    .iter()
314                    .filter(|child| child.status() != LeafStatus::Finished)
315                    .count();
316                if let BarTree::Group(_, pb, _, _) = parent {
317                    if non_finished_count == 1 {
318                        self.multi_progress.remove(pb.as_ref());
319                        self.progress_list
320                            .iter_mut()
321                            .find(|(pb2, _)| Arc::ptr_eq(pb2, pb))
322                            .unwrap()
323                            .1 = false;
324                    } else if non_finished_count == 0 {
325                        let self_idx = self
326                            .progress_list
327                            .iter()
328                            .position(|(pb2, _)| Arc::ptr_eq(pb2, pb))
329                            .unwrap();
330
331                        let last_visible_before = self.progress_list[..self_idx]
332                            .iter()
333                            .rposition(|(_, visible)| *visible);
334
335                        if let Some(last_visible_before) = last_visible_before {
336                            self.multi_progress.insert_after(
337                                &self.progress_list[last_visible_before].0,
338                                pb.as_ref().clone(),
339                            );
340                        } else {
341                            self.multi_progress.insert(0, pb.as_ref().clone());
342                        }
343
344                        self.progress_list[self_idx].1 = true;
345                    }
346                }
347            }
348
349            _ => panic!(),
350        };
351
352        self.tree.refresh_prefix(&[]);
353
354        self.current_count -= 1;
355        if self.current_count == 0 {
356            self.multi_progress.clear().unwrap();
357        }
358    }
359}
360
361impl ProgressTracker {
362    pub fn println(msg: impl AsRef<str>) {
363        let progress_bar = PROGRESS_TRACKER
364            .get_or_init(|| Mutex::new(ProgressTracker::new()))
365            .lock()
366            .unwrap();
367
368        progress_bar.multi_progress.suspend(|| {
369            println!("{}", msg.as_ref());
370        });
371    }
372
373    pub fn eprintln(msg: impl AsRef<str>) {
374        let progress_bar = PROGRESS_TRACKER
375            .get_or_init(|| Mutex::new(ProgressTracker::new()))
376            .lock()
377            .unwrap();
378
379        progress_bar.multi_progress.suspend(|| {
380            eprintln!("{}", msg.as_ref());
381        });
382    }
383
384    pub fn with_group<'a, T, F: Future<Output = T>>(
385        name: impl Into<String>,
386        anticipated_total: Option<usize>,
387        f: impl FnOnce() -> F + 'a,
388    ) -> impl Future<Output = T> + 'a {
389        let mut group = CURRENT_GROUP
390            .try_with(|cur| cur.clone())
391            .unwrap_or_default();
392
393        let (group_i, _) = {
394            let mut progress_bar = PROGRESS_TRACKER
395                .get_or_init(|| Mutex::new(ProgressTracker::new()))
396                .lock()
397                .unwrap();
398            progress_bar.start_task(group.clone(), name.into(), true, anticipated_total, false)
399        };
400
401        group.push(group_i);
402
403        CURRENT_GROUP.scope(group.clone(), async {
404            let out = f().await;
405            let mut progress_bar = PROGRESS_TRACKER
406                .get_or_init(|| Mutex::new(ProgressTracker::new()))
407                .lock()
408                .unwrap();
409            progress_bar.end_task(group);
410            out
411        })
412    }
413
414    pub fn leaf<T, F: Future<Output = T>>(
415        name: impl Into<String>,
416        f: F,
417    ) -> impl Future<Output = T> {
418        let mut group = CURRENT_GROUP
419            .try_with(|cur| cur.clone())
420            .unwrap_or_default();
421
422        let (leaf_i, _) = {
423            let mut progress_bar = PROGRESS_TRACKER
424                .get_or_init(|| Mutex::new(ProgressTracker::new()))
425                .lock()
426                .unwrap();
427            progress_bar.start_task(group.clone(), name.into(), false, None, false)
428        };
429
430        group.push(leaf_i);
431
432        async move {
433            let out = f.await;
434            let mut progress_bar = PROGRESS_TRACKER
435                .get_or_init(|| Mutex::new(ProgressTracker::new()))
436                .lock()
437                .unwrap();
438            progress_bar.end_task(group);
439            out
440        }
441    }
442
443    pub fn rich_leaf<'a, T, F: Future<Output = T>>(
444        name: impl Into<String>,
445        f: impl FnOnce(Box<dyn Fn(String) + Send + Sync>) -> F + 'a,
446    ) -> impl Future<Output = T> + 'a {
447        let mut group = CURRENT_GROUP
448            .try_with(|cur| cur.clone())
449            .unwrap_or_default();
450
451        let (leaf_i, bar) = {
452            let mut progress_bar = PROGRESS_TRACKER
453                .get_or_init(|| Mutex::new(ProgressTracker::new()))
454                .lock()
455                .unwrap();
456            progress_bar.start_task(group.clone(), name.into(), false, None, false)
457        };
458
459        group.push(leaf_i);
460
461        async move {
462            let my_bar = bar.clone();
463            let out = f(Box::new(move |msg| {
464                my_bar.set_message(msg);
465            }))
466            .await;
467            let mut progress_bar = PROGRESS_TRACKER
468                .get_or_init(|| Mutex::new(ProgressTracker::new()))
469                .lock()
470                .unwrap();
471            progress_bar.end_task(group);
472            out
473        }
474    }
475
476    pub fn progress_leaf<'a, T, F: Future<Output = T>>(
477        name: impl Into<String>,
478        f: impl FnOnce(Box<dyn Fn(u64) + Send + Sync>, Box<dyn Fn(String) + Send + Sync>) -> F + 'a,
479    ) -> impl Future<Output = T> + 'a {
480        let mut group = CURRENT_GROUP
481            .try_with(|cur| cur.clone())
482            .unwrap_or_default();
483
484        let (leaf_i, bar) = {
485            let mut progress_bar = PROGRESS_TRACKER
486                .get_or_init(|| Mutex::new(ProgressTracker::new()))
487                .lock()
488                .unwrap();
489            progress_bar.start_task(group.clone(), name.into(), false, None, true)
490        };
491
492        group.push(leaf_i);
493
494        async move {
495            let my_bar = bar.clone();
496            let my_bar_2 = bar.clone();
497            let out = f(
498                Box::new(move |progress| {
499                    my_bar.set_position(progress);
500                }),
501                Box::new(move |msg| {
502                    my_bar_2.set_message(msg);
503                }),
504            )
505            .await;
506            let mut progress_bar = PROGRESS_TRACKER
507                .get_or_init(|| Mutex::new(ProgressTracker::new()))
508                .lock()
509                .unwrap();
510            progress_bar.end_task(group);
511            out
512        }
513    }
514}