dfir_rs/scheduled/
metrics.rs1use std::cell::Cell;
4use std::iter::FusedIterator;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::task::{Context, Poll};
8
9use pin_project_lite::pin_project;
10use web_time::{Duration, Instant};
11
12#[expect(unused_imports, reason = "used for rustdoc links")]
13use super::graph::Dfir;
14use super::{HandoffTag, SubgraphTag};
15use crate::util::slot_vec::SecondarySlotVec;
16
17#[derive(Default, Clone)]
22#[non_exhaustive]
23pub struct DfirMetrics {
24 pub subgraphs: SecondarySlotVec<SubgraphTag, SubgraphMetrics>,
26 pub handoffs: SecondarySlotVec<HandoffTag, HandoffMetrics>,
28}
29
30impl DfirMetrics {
31 pub(super) fn diff(&mut self, other: &Self) {
33 for (sg_id, prev_sg_metrics) in other.subgraphs.iter() {
34 if let Some(curr_sg_metrics) = self.subgraphs.get_mut(sg_id) {
35 curr_sg_metrics.diff(prev_sg_metrics);
36 }
37 }
38 for (handoff_id, prev_handoff_metrics) in other.handoffs.iter() {
39 if let Some(curr_handoff_metrics) = self.handoffs.get_mut(handoff_id) {
40 curr_handoff_metrics.diff(prev_handoff_metrics);
41 }
42 }
43 }
44}
45
46#[derive(Clone)]
48pub struct DfirMetricsIntervals {
49 pub(super) curr: Rc<DfirMetrics>,
51 pub(super) prev: Option<DfirMetrics>,
53}
54
55impl Iterator for DfirMetricsIntervals {
56 type Item = DfirMetrics;
57
58 fn next(&mut self) -> Option<Self::Item> {
59 let mut curr = self.curr.as_ref().clone();
60 if let Some(prev) = self.prev.replace(curr.clone()) {
61 curr.diff(&prev);
62 }
63 Some(curr)
64 }
65
66 fn size_hint(&self) -> (usize, Option<usize>) {
67 (usize::MAX, None)
68 }
69
70 #[track_caller]
71 fn last(self) -> Option<DfirMetrics> {
72 panic!("iterator is infinite");
73 }
74
75 #[track_caller]
76 fn count(self) -> usize {
77 panic!("iterator is infinite");
78 }
79}
80
81impl FusedIterator for DfirMetricsIntervals {}
82
83macro_rules! define_metrics {
85 (
86 $(#[$struct_attr:meta])*
87 pub struct $struct_name:ident {
88 $(
89 $( #[doc = $doc:literal] )*
90 #[diff($diff:ident)]
91 $( #[$field_attr:meta] )*
92 $field_vis:vis $field_name:ident: Cell<$field_type:ty>,
93 )*
94 }
95 ) => {
96 $(#[$struct_attr])*
97 #[derive(Default, Debug, Clone)]
98 #[non_exhaustive] pub struct $struct_name {
100 $(
101 $(#[$field_attr])*
102 $field_vis $field_name: Cell<$field_type>,
103 )*
104 }
105
106 impl $struct_name {
107 $(
108 $( #[doc = $doc] )*
109 pub fn $field_name(&self) -> $field_type {
110 self.$field_name.get()
111 }
112 )*
113
114 fn diff(&mut self, other: &Self) {
115 $(
116 define_metrics_diff_field!($diff, $field_name, self, other);
117 )*
118 }
119 }
120 };
121}
122
123macro_rules! define_metrics_diff_field {
124 (total, $field:ident, $slf:ident, $other:ident) => {
125 debug_assert!($other.$field.get() <= $slf.$field.get());
126 $slf.$field.update(|x| x - $other.$field.get());
127 };
128 (curr, $field:ident, $slf:ident, $other:ident) => {};
129}
130
131define_metrics! {
132 pub struct HandoffMetrics {
134 #[diff(curr)]
136 pub(super) curr_items_count: Cell<usize>,
137
138 #[diff(total)]
140 pub(super) total_items_count: Cell<usize>,
141 }
142}
143
144define_metrics! {
145 pub struct SubgraphMetrics {
147 #[diff(total)]
149 pub(super) total_run_count: Cell<usize>,
150
151 #[diff(total)]
153 pub(super) total_poll_duration: Cell<Duration>,
154
155 #[diff(total)]
157 pub(super) total_poll_count: Cell<usize>,
158
159 #[diff(total)]
161 pub(super) total_idle_duration: Cell<Duration>,
162
163 #[diff(total)]
165 pub(super) total_idle_count: Cell<usize>,
166 }
167}
168
169pin_project! {
170 pub(crate) struct InstrumentSubgraph<'a, Fut> {
172 #[pin]
173 future: Fut,
174 idle_start: Option<Instant>,
175 metrics: &'a SubgraphMetrics,
176 }
177}
178
179impl<'a, Fut> InstrumentSubgraph<'a, Fut> {
180 pub(crate) fn new(future: Fut, metrics: &'a SubgraphMetrics) -> Self {
181 Self {
182 future,
183 idle_start: None,
184 metrics,
185 }
186 }
187}
188
189impl<'a, Fut> Future for InstrumentSubgraph<'a, Fut>
190where
191 Fut: Future,
192{
193 type Output = Fut::Output;
194 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 let this = self.project();
196
197 if let Some(idle_start) = this.idle_start {
199 this.metrics
200 .total_idle_duration
201 .update(|x| x + idle_start.elapsed());
202 this.metrics.total_idle_count.update(|x| x + 1);
203 }
204
205 let poll_start = Instant::now();
207 let out = this.future.poll(cx);
208
209 this.metrics
211 .total_poll_duration
212 .update(|x| x + poll_start.elapsed());
213 this.metrics.total_poll_count.update(|x| x + 1);
214
215 this.idle_start.replace(Instant::now());
217
218 out
219 }
220}
221
222#[cfg(test)]
223mod test {
224 use super::*;
225 use crate::scheduled::{HandoffId, SubgraphId};
226
227 #[test]
228 fn test_dfir_metrics_intervals() {
229 let sg_id = SubgraphId::from_raw(0);
230 let handoff_id = HandoffId::from_raw(0);
231
232 let mut metrics = DfirMetrics::default();
233 metrics.subgraphs.insert(
234 sg_id,
235 SubgraphMetrics {
236 total_run_count: Cell::new(5),
237 total_poll_count: Cell::new(10),
238 total_idle_count: Cell::new(2),
239 total_poll_duration: Cell::new(Duration::from_millis(500)),
240 total_idle_duration: Cell::new(Duration::from_millis(200)),
241 },
242 );
243 metrics.handoffs.insert(
244 handoff_id,
245 HandoffMetrics {
246 curr_items_count: Cell::new(3),
247 total_items_count: Cell::new(100),
248 },
249 );
250 let metrics = Rc::new(metrics);
251
252 let mut intervals = DfirMetricsIntervals {
253 curr: Rc::clone(&metrics),
254 prev: None,
255 };
256
257 let first = intervals.next().unwrap();
259 let sg_metrics = &first.subgraphs[sg_id];
260 assert_eq!(sg_metrics.total_run_count(), 5);
261 let hoff_metrics = &first.handoffs[handoff_id];
262 assert_eq!(hoff_metrics.total_items_count(), 100);
263 assert_eq!(hoff_metrics.curr_items_count(), 3);
264
265 let sg_metrics = &metrics.subgraphs[sg_id];
267 sg_metrics.total_run_count.set(12);
268 sg_metrics.total_poll_count.set(25);
269 sg_metrics.total_idle_count.set(7);
270 sg_metrics
271 .total_poll_duration
272 .set(Duration::from_millis(1200));
273 sg_metrics
274 .total_idle_duration
275 .set(Duration::from_millis(600));
276 let hoff_metrics = &metrics.handoffs[handoff_id];
277 hoff_metrics.total_items_count.set(250);
278 hoff_metrics.curr_items_count.set(10);
279
280 let second = intervals.next().unwrap();
282 let sg_metrics = &second.subgraphs[sg_id];
283 assert_eq!(sg_metrics.total_run_count(), 7); assert_eq!(sg_metrics.total_poll_count(), 15); assert_eq!(sg_metrics.total_idle_count(), 5); let hoff_metrics = &second.handoffs[handoff_id];
288 assert_eq!(hoff_metrics.total_items_count(), 150); assert_eq!(hoff_metrics.curr_items_count(), 10);
292 }
293}