dfir_rs/scheduled/
reactor.rs

1//! Module for [`Reactor`].
2
3use tokio::sync::mpsc::UnboundedSender;
4use tokio::sync::mpsc::error::SendError;
5
6use super::SubgraphId;
7
8/// A handle into a specific [super::graph::Dfir] instance for triggering
9/// subgraphs to run, possibly from another thread.
10///
11/// Reactor events are considered to be external events.
12#[derive(Clone)]
13pub struct Reactor {
14    event_queue_send: UnboundedSender<(SubgraphId, bool)>,
15}
16impl Reactor {
17    pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self {
18        Self { event_queue_send }
19    }
20
21    /// Trigger a subgraph as an external event.
22    pub fn trigger(&self, sg_id: SubgraphId) -> Result<(), SendError<(SubgraphId, bool)>> {
23        self.event_queue_send.send((sg_id, true))
24    }
25
26    /// Convert this `Reactor` into a [`std::task::Waker`] for use with async runtimes.
27    pub fn into_waker(self, sg_id: SubgraphId) -> std::task::Waker {
28        use std::sync::Arc;
29
30        use futures::task::ArcWake;
31
32        struct ReactorWaker {
33            reactor: Reactor,
34            sg_id: SubgraphId,
35        }
36        impl ArcWake for ReactorWaker {
37            fn wake_by_ref(arc_self: &Arc<Self>) {
38                arc_self.reactor.trigger(arc_self.sg_id).unwrap(/* TODO(mingwei) */);
39            }
40        }
41
42        let reactor_waker = ReactorWaker {
43            reactor: self,
44            sg_id,
45        };
46        futures::task::waker(Arc::new(reactor_waker))
47    }
48}