dfir_rs/scheduled/
reactor.rs

1//! Module for [`Reactor`].
2
3use tokio::sync::mpsc::UnboundedSender;
4use tokio::sync::mpsc::error::SendError;
5
6use super::SubgraphId;
7
8/// A handle into a specific [super::graph::Dfir] instance for triggering
9/// subgraphs to run, possibly from another thread.
10///
11/// Reactor events are considered to be external events.
12#[derive(Clone)]
13pub struct Reactor {
14    event_queue_send: UnboundedSender<(SubgraphId, bool)>,
15}
16impl Reactor {
17    pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self {
18        Self { event_queue_send }
19    }
20
21    /// Trigger a subgraph as an external event.
22    pub fn trigger(&self, sg_id: SubgraphId) -> Result<(), SendError<(SubgraphId, bool)>> {
23        self.event_queue_send.send((sg_id, true))
24    }
25
26    /// Convert this `Reactor` into a [`std::task::Waker`] for use with async runtimes.
27    pub fn into_waker(self, sg_id: SubgraphId) -> std::task::Waker {
28        use std::sync::Arc;
29        use std::task::Wake;
30
31        struct ReactorWaker {
32            reactor: Reactor,
33            sg_id: SubgraphId,
34        }
35        impl Wake for ReactorWaker {
36            fn wake(self: Arc<Self>) {
37                self.wake_by_ref();
38            }
39
40            fn wake_by_ref(self: &Arc<Self>) {
41                let _recv_closed_error = self.reactor.trigger(self.sg_id);
42            }
43        }
44
45        let reactor_waker = ReactorWaker {
46            reactor: self,
47            sg_id,
48        };
49        std::task::Waker::from(Arc::new(reactor_waker))
50    }
51}