dfir_rs/scheduled/
metrics.rs1use std::cell::Cell;
4use std::iter::FusedIterator;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::task::{Context, Poll};
8
9use pin_project_lite::pin_project;
10use web_time::{Duration, Instant};
11
12use super::{HandoffId, HandoffTag, SubgraphId, SubgraphTag};
13use crate::util::slot_vec::SecondarySlotVec;
14
15#[derive(Default, Clone)]
16pub(super) struct DfirMetricsState {
17 pub(super) subgraph_metrics: SecondarySlotVec<SubgraphTag, SubgraphMetrics>,
18 pub(super) handoff_metrics: SecondarySlotVec<HandoffTag, HandoffMetrics>,
19}
20
21#[derive(Clone)]
23pub struct DfirMetrics {
24 pub(super) curr: Rc<DfirMetricsState>,
26 pub(super) prev: Option<DfirMetricsState>,
29}
30impl DfirMetrics {
31 pub fn reset(&mut self) {
33 self.prev = Some(self.curr.as_ref().clone());
35 }
36
37 pub fn subgraph_ids(
39 &self,
40 ) -> impl '_ + DoubleEndedIterator<Item = SubgraphId> + FusedIterator + Clone {
41 self.curr.subgraph_metrics.keys()
42 }
43
44 pub fn subgraph_metrics(&self, sg_id: SubgraphId) -> SubgraphMetrics {
46 let curr = &self.curr.subgraph_metrics[sg_id];
47 self.prev
48 .as_ref()
49 .map(|prev| &prev.subgraph_metrics[sg_id])
50 .map(|prev| SubgraphMetrics {
51 total_run_count: Cell::new(curr.total_run_count.get() - prev.total_run_count.get()),
52 total_poll_duration: Cell::new(
53 curr.total_poll_duration.get() - prev.total_poll_duration.get(),
54 ),
55 total_poll_count: Cell::new(
56 curr.total_poll_count.get() - prev.total_poll_count.get(),
57 ),
58 total_idle_duration: Cell::new(
59 curr.total_idle_duration.get() - prev.total_idle_duration.get(),
60 ),
61 total_idle_count: Cell::new(
62 curr.total_idle_count.get() - prev.total_idle_count.get(),
63 ),
64 })
65 .unwrap_or_else(|| curr.clone())
66 }
67
68 pub fn handoff_ids(
70 &self,
71 ) -> impl '_ + DoubleEndedIterator<Item = HandoffId> + FusedIterator + Clone {
72 self.curr.handoff_metrics.keys()
73 }
74
75 pub fn handoff_metrics(&self, handoff_id: HandoffId) -> HandoffMetrics {
77 let curr = &self.curr.handoff_metrics[handoff_id];
78 self.prev
79 .as_ref()
80 .map(|prev| &prev.handoff_metrics[handoff_id])
81 .map(|prev| HandoffMetrics {
82 total_items_count: Cell::new(
83 curr.total_items_count.get() - prev.total_items_count.get(),
84 ),
85 curr_items_count: curr.curr_items_count.clone(),
86 })
87 .unwrap_or_else(|| curr.clone())
88 }
89}
90
91macro_rules! define_getters {
93 (
94 $(#[$struct_attr:meta])*
95 pub struct $struct_name:ident {
96 $(
97 $(#[$field_attr:meta])*
98 $field_vis:vis $field_name:ident: Cell<$field_type:ty>,
99 )*
100 }
101 ) => {
102 $(#[$struct_attr])*
103 #[derive(Default, Debug, Clone)]
104 #[non_exhaustive] pub struct $struct_name {
106 $(
107 $(#[$field_attr])*
108 $field_vis $field_name: Cell<$field_type>,
109 )*
110 }
111
112 impl $struct_name {
113 $(
114 $(#[$field_attr])*
115 pub fn $field_name(&self) -> $field_type {
116 self.$field_name.get()
117 }
118 )*
119 }
120 };
121}
122
123define_getters! {
124 pub struct HandoffMetrics {
126 pub(super) curr_items_count: Cell<usize>,
128 pub(super) total_items_count: Cell<usize>,
130 }
131}
132
133define_getters! {
134 pub struct SubgraphMetrics {
136 pub(super) total_run_count: Cell<usize>,
138 pub(super) total_poll_duration: Cell<Duration>,
140 pub(super) total_poll_count: Cell<usize>,
142 pub(super) total_idle_duration: Cell<Duration>,
144 pub(super) total_idle_count: Cell<usize>,
146 }
147}
148
149pin_project! {
150 pub(crate) struct InstrumentSubgraph<'a, Fut> {
152 #[pin]
153 future: Fut,
154 idle_start: Option<Instant>,
155 metrics: &'a SubgraphMetrics,
156 }
157}
158
159impl<'a, Fut> InstrumentSubgraph<'a, Fut> {
160 pub(crate) fn new(future: Fut, metrics: &'a SubgraphMetrics) -> Self {
161 Self {
162 future,
163 idle_start: None,
164 metrics,
165 }
166 }
167}
168
169impl<'a, Fut> Future for InstrumentSubgraph<'a, Fut>
170where
171 Fut: Future,
172{
173 type Output = Fut::Output;
174 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175 let this = self.project();
176
177 if let Some(idle_start) = this.idle_start {
179 this.metrics
180 .total_idle_duration
181 .update(|x| x + idle_start.elapsed());
182 this.metrics.total_idle_count.update(|x| x + 1);
183 }
184
185 let poll_start = Instant::now();
187 let out = this.future.poll(cx);
188
189 this.metrics
191 .total_poll_duration
192 .update(|x| x + poll_start.elapsed());
193 this.metrics.total_poll_count.update(|x| x + 1);
194
195 this.idle_start.replace(Instant::now());
197
198 out
199 }
200}