dfir_rs/scheduled/net/
mod.rs

1#![cfg(not(target_arch = "wasm32"))]
2#![allow(clippy::allow_attributes, missing_docs, reason = "// TODO(mingwei)")]
3
4//! This module contiains networking code.
5//!
6//! ## How Tokio interacts with the DFIR runtime (Mingwei 2021-12-07)
7//!
8//! [Tokio](https://tokio.rs/) is a Rust async runtime. In Rust's async/await
9//! system, `Future`s must be spawned or sent to an async runtime in order to
10//! run. Tokio is the most popular provider of one of these runtimes, with
11//! [async-std](https://async.rs/) (mirrors std lib) and [smol](https://github.com/smol-rs/smol)
12//! (minimal runtime) as commonly used alternatives.
13//!
14//! Fundamentally, an async runtime's job is to poll futures (read: run tasks)
15//! when they are ready to make progress. However async runtimes also provide a
16//! `Future`s abstraction for async events such as timers, network IO, and
17//! filesystem IO. To do this, [Tokio](https://tokio.rs/) uses [Mio](https://github.com/tokio-rs/mio)
18//! which is a low-level non-blocking API for IO event notification/polling.
19//! A user of Mio can write an event loop, i.e. something like: wait for
20//! events, run computations responding to those events, repeat. Tokio provides
21//! the higher-level async/await slash `Future` abstraction on top of Mio, as
22//! well as the runtime to execute those `Future`s. Essentially, the Tokio
23//! async runtime essentially replaces the low-level event loop a user might
24//! handwrite when using Mio.
25//!
26//! For context, both Mio and Tokio provide socket/UDP/TCP-level network
27//! abstractions, which is probably the right layer for us. There are also
28//! libraries built on top of Tokio providing nice server/client HTTP APIs
29//! like [Hyper](https://hyper.rs/).
30//!
31//! The DFIR scheduled layer scheduler is essentially the same as a simple
32//! event loop: it runs subgraphs when they have data. We have also let it
33//! respond to external asynchonous events by providing a threadsafe channel
34//! through which subgraphs can be externally scheduled.
35//!
36//! In order to add networking to DFIR, in our current implementation we
37//! use Tokio and have a compatibility mechanism for working with `Future`s.
38//! A `Future` provides a `Waker` mechanism to notify when it had work to do,
39//! so we have hooked these Wakers up with DFIR's threadsafe external
40//! scheduling channel. This essentially turns DFIR into a simple async
41//! runtime.
42//!
43//! However in some situations, we still need to run futures outside of
44//! DFIR's basic runtime. It's not a goal for DFIR to provide all
45//! the features of a full runtime like Tokio. Currently for this situation we
46//! run DFIR as a task (`Future`) within the Tokio runtime. In DFIR's
47//! event loop we do all available work, then rather than block and wait for
48//! external events to schedule more tasks, we temporarily yield back to the
49//! Tokio runtime. Tokio will then respond to any outstanding events it has
50//! before once again running the DFIR scheduler task.
51//!
52//! This works perfectly well but maybe isn't the best solution long-term.
53//! In the future we may want to remove the extra Tokio runtime layer and
54//! interface with Mio directly. In this case we would have to do our own
55//! socket-style polling within the DFIR scheduler's event loop, which
56//! would require some extra work and thought. But for now interfacing with
57//! Tokio works and I don't think the overhead of the extra runtime loop is
58//! significant.
59
60use std::collections::VecDeque;
61use std::pin::Pin;
62
63use byteorder::{NetworkEndian, WriteBytesExt};
64use futures::{Sink, StreamExt};
65use tokio::net::TcpStream;
66use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
67use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
68
69use super::graph::Dfir;
70use super::graph_ext::GraphExt;
71use super::handoff::VecHandoff;
72use super::port::{RecvPort, SendPort};
73
74pub mod network_vertex;
75
76const ADDRESS_LEN: usize = 4;
77
78// TODO(justin): I don't think we should include the address here, that should
79// just be a part of the bytes being sent.
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct Message {
82    pub address: u32,
83    pub batch: bytes::Bytes,
84}
85
86impl Message {
87    fn encode(&self, v: &mut Vec<u8>) {
88        v.write_u32::<NetworkEndian>(self.address).unwrap();
89        v.extend(self.batch.iter());
90    }
91
92    pub fn decode(v: bytes::Bytes) -> Self {
93        let address = u32::from_be_bytes(v[0..ADDRESS_LEN].try_into().unwrap());
94        let batch = v.slice(ADDRESS_LEN..);
95        Message { address, batch }
96    }
97}
98
99impl Dfir<'_> {
100    fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort<VecHandoff<Message>> {
101        let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
102        let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
103        self.add_input_from_stream(
104            "tcp ingress",
105            send_port,
106            reader.map(|buf| Some(<Message>::decode(buf.unwrap().into()))),
107        );
108        recv_port
109    }
110
111    fn register_write_tcp_stream(
112        &mut self,
113        writer: OwnedWriteHalf,
114    ) -> SendPort<VecHandoff<Message>> {
115        let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
116        let mut message_queue = VecDeque::new();
117
118        let (input_port, output_port) =
119            self.make_edge::<_, VecHandoff<Message>>("tcp egress handoff");
120        self.add_subgraph_sink("tcp egress", output_port, move |ctx, recv| {
121            let waker = ctx.waker();
122            let mut cx = std::task::Context::from_waker(&waker);
123
124            // TODO(mingwei): queue may grow unbounded? Subtle rate matching concern.
125            // TODO(mingwei): put into state system.
126            message_queue.extend(recv.take_inner());
127            while !message_queue.is_empty() {
128                if let std::task::Poll::Ready(Ok(())) = Pin::new(&mut writer).poll_ready(&mut cx) {
129                    let v = message_queue.pop_front().unwrap();
130                    let mut buf = Vec::new();
131                    v.encode(&mut buf);
132
133                    Pin::new(&mut writer).start_send(buf.into()).unwrap();
134                }
135            }
136            let _ = Pin::new(&mut writer).poll_flush(&mut cx);
137        });
138
139        input_port
140    }
141
142    pub fn add_write_tcp_stream(&mut self, stream: TcpStream) -> SendPort<VecHandoff<Message>> {
143        let (_, writer) = stream.into_split();
144
145        self.register_write_tcp_stream(writer)
146    }
147
148    pub fn add_read_tcp_stream(&mut self, stream: TcpStream) -> RecvPort<VecHandoff<Message>> {
149        let (reader, _) = stream.into_split();
150
151        self.register_read_tcp_stream(reader)
152    }
153
154    pub fn add_tcp_stream(
155        &mut self,
156        stream: TcpStream,
157    ) -> (SendPort<VecHandoff<Message>>, RecvPort<VecHandoff<Message>>) {
158        let (reader, writer) = stream.into_split();
159
160        (
161            self.register_write_tcp_stream(writer),
162            self.register_read_tcp_stream(reader),
163        )
164    }
165}