dfir_rs/scheduled/net/mod.rs
1#![cfg(not(target_arch = "wasm32"))]
2#![allow(clippy::allow_attributes, missing_docs, reason = "// TODO(mingwei)")]
3
4//! This module contiains networking code.
5//!
6//! ## How Tokio interacts with the DFIR runtime (Mingwei 2021-12-07)
7//!
8//! [Tokio](https://tokio.rs/) is a Rust async runtime. In Rust's async/await
9//! system, `Future`s must be spawned or sent to an async runtime in order to
10//! run. Tokio is the most popular provider of one of these runtimes, with
11//! [async-std](https://async.rs/) (mirrors std lib) and [smol](https://github.com/smol-rs/smol)
12//! (minimal runtime) as commonly used alternatives.
13//!
14//! Fundamentally, an async runtime's job is to poll futures (read: run tasks)
15//! when they are ready to make progress. However async runtimes also provide a
16//! `Future`s abstraction for async events such as timers, network IO, and
17//! filesystem IO. To do this, [Tokio](https://tokio.rs/) uses [Mio](https://github.com/tokio-rs/mio)
18//! which is a low-level non-blocking API for IO event notification/polling.
19//! A user of Mio can write an event loop, i.e. something like: wait for
20//! events, run computations responding to those events, repeat. Tokio provides
21//! the higher-level async/await slash `Future` abstraction on top of Mio, as
22//! well as the runtime to execute those `Future`s. Essentially, the Tokio
23//! async runtime essentially replaces the low-level event loop a user might
24//! handwrite when using Mio.
25//!
26//! For context, both Mio and Tokio provide socket/UDP/TCP-level network
27//! abstractions, which is probably the right layer for us. There are also
28//! libraries built on top of Tokio providing nice server/client HTTP APIs
29//! like [Hyper](https://hyper.rs/).
30//!
31//! The DFIR scheduled layer scheduler is essentially the same as a simple
32//! event loop: it runs subgraphs when they have data. We have also let it
33//! respond to external asynchonous events by providing a threadsafe channel
34//! through which subgraphs can be externally scheduled.
35//!
36//! In order to add networking to DFIR, in our current implementation we
37//! use Tokio and have a compatibility mechanism for working with `Future`s.
38//! A `Future` provides a `Waker` mechanism to notify when it had work to do,
39//! so we have hooked these Wakers up with DFIR's threadsafe external
40//! scheduling channel. This essentially turns DFIR into a simple async
41//! runtime.
42//!
43//! However in some situations, we still need to run futures outside of
44//! DFIR's basic runtime. It's not a goal for DFIR to provide all
45//! the features of a full runtime like Tokio. Currently for this situation we
46//! run DFIR as a task (`Future`) within the Tokio runtime. In DFIR's
47//! event loop we do all available work, then rather than block and wait for
48//! external events to schedule more tasks, we temporarily yield back to the
49//! Tokio runtime. Tokio will then respond to any outstanding events it has
50//! before once again running the DFIR scheduler task.
51//!
52//! This works perfectly well but maybe isn't the best solution long-term.
53//! In the future we may want to remove the extra Tokio runtime layer and
54//! interface with Mio directly. In this case we would have to do our own
55//! socket-style polling within the DFIR scheduler's event loop, which
56//! would require some extra work and thought. But for now interfacing with
57//! Tokio works and I don't think the overhead of the extra runtime loop is
58//! significant.
59
60use std::collections::VecDeque;
61use std::pin::Pin;
62
63use byteorder::{NetworkEndian, WriteBytesExt};
64use futures::{Sink, StreamExt};
65use tokio::net::TcpStream;
66use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
67use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
68
69use super::graph::Dfir;
70use super::graph_ext::GraphExt;
71use super::handoff::VecHandoff;
72use super::port::{RecvPort, SendPort};
73
74pub mod network_vertex;
75
76const ADDRESS_LEN: usize = 4;
77
78// TODO(justin): I don't think we should include the address here, that should
79// just be a part of the bytes being sent.
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct Message {
82 pub address: u32,
83 pub batch: bytes::Bytes,
84}
85
86impl Message {
87 fn encode(&self, v: &mut Vec<u8>) {
88 v.write_u32::<NetworkEndian>(self.address).unwrap();
89 v.extend(self.batch.iter());
90 }
91
92 pub fn decode(v: bytes::Bytes) -> Self {
93 let address = u32::from_be_bytes(v[0..ADDRESS_LEN].try_into().unwrap());
94 let batch = v.slice(ADDRESS_LEN..);
95 Message { address, batch }
96 }
97}
98
99impl Dfir<'_> {
100 fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort<VecHandoff<Message>> {
101 let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
102 let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
103 self.add_input_from_stream(
104 "tcp ingress",
105 send_port,
106 reader.map(|buf| Some(<Message>::decode(buf.unwrap().into()))),
107 );
108 recv_port
109 }
110
111 fn register_write_tcp_stream(
112 &mut self,
113 writer: OwnedWriteHalf,
114 ) -> SendPort<VecHandoff<Message>> {
115 let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
116 let mut message_queue = VecDeque::new();
117
118 let (input_port, output_port) =
119 self.make_edge::<_, VecHandoff<Message>>("tcp egress handoff");
120 self.add_subgraph_sink("tcp egress", output_port, move |ctx, recv| {
121 let waker = ctx.waker();
122 let mut cx = std::task::Context::from_waker(&waker);
123
124 // TODO(mingwei): queue may grow unbounded? Subtle rate matching concern.
125 // TODO(mingwei): put into state system.
126 message_queue.extend(recv.take_inner());
127 while !message_queue.is_empty() {
128 if let std::task::Poll::Ready(Ok(())) = Pin::new(&mut writer).poll_ready(&mut cx) {
129 let v = message_queue.pop_front().unwrap();
130 let mut buf = Vec::new();
131 v.encode(&mut buf);
132
133 Pin::new(&mut writer).start_send(buf.into()).unwrap();
134 }
135 }
136 let _ = Pin::new(&mut writer).poll_flush(&mut cx);
137 });
138
139 input_port
140 }
141
142 pub fn add_write_tcp_stream(&mut self, stream: TcpStream) -> SendPort<VecHandoff<Message>> {
143 let (_, writer) = stream.into_split();
144
145 self.register_write_tcp_stream(writer)
146 }
147
148 pub fn add_read_tcp_stream(&mut self, stream: TcpStream) -> RecvPort<VecHandoff<Message>> {
149 let (reader, _) = stream.into_split();
150
151 self.register_read_tcp_stream(reader)
152 }
153
154 pub fn add_tcp_stream(
155 &mut self,
156 stream: TcpStream,
157 ) -> (SendPort<VecHandoff<Message>>, RecvPort<VecHandoff<Message>>) {
158 let (reader, writer) = stream.into_split();
159
160 (
161 self.register_write_tcp_stream(writer),
162 self.register_read_tcp_stream(reader),
163 )
164 }
165}