dfir_rs/scheduled/
reactor.rs1use tokio::sync::mpsc::UnboundedSender;
4use tokio::sync::mpsc::error::SendError;
5
6use super::SubgraphId;
7
8#[derive(Clone)]
13pub struct Reactor {
14 event_queue_send: UnboundedSender<(SubgraphId, bool)>,
15}
16impl Reactor {
17 pub(crate) fn new(event_queue_send: UnboundedSender<(SubgraphId, bool)>) -> Self {
18 Self { event_queue_send }
19 }
20
21 pub fn trigger(&self, sg_id: SubgraphId) -> Result<(), SendError<(SubgraphId, bool)>> {
23 self.event_queue_send.send((sg_id, true))
24 }
25
26 pub fn into_waker(self, sg_id: SubgraphId) -> std::task::Waker {
28 use std::sync::Arc;
29 use std::task::Wake;
30
31 struct ReactorWaker {
32 reactor: Reactor,
33 sg_id: SubgraphId,
34 }
35 impl Wake for ReactorWaker {
36 fn wake(self: Arc<Self>) {
37 self.wake_by_ref();
38 }
39
40 fn wake_by_ref(self: &Arc<Self>) {
41 let _recv_closed_error = self.reactor.trigger(self.sg_id);
42 }
43 }
44
45 let reactor_waker = ReactorWaker {
46 reactor: self,
47 sg_id,
48 };
49 std::task::Waker::from(Arc::new(reactor_waker))
50 }
51}