1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
#![cfg(not(target_arch = "wasm32"))]
#![allow(clippy::allow_attributes, missing_docs, reason = "// TODO(mingwei)")]
//! This module contiains networking code.
//!
//! ## How Tokio interacts with Hydroflow (Mingwei 2021-12-07)
//!
//! [Tokio](https://tokio.rs/) is a Rust async runtime. In Rust's async/await
//! system, `Future`s must be spawned or sent to an async runtime in order to
//! run. Tokio is the most popular provider of one of these runtimes, with
//! [async-std](https://async.rs/) (mirrors std lib) and [smol](https://github.com/smol-rs/smol)
//! (minimal runtime) as commonly used alternatives.
//!
//! Fundamentally, an async runtime's job is to poll futures (read: run tasks)
//! when they are ready to make progress. However async runtimes also provide a
//! `Future`s abstraction for async events such as timers, network IO, and
//! filesystem IO. To do this, [Tokio](https://tokio.rs/) uses [Mio](https://github.com/tokio-rs/mio)
//! which is a low-level non-blocking API for IO event notification/polling.
//! A user of Mio can write an event loop, i.e. something like: wait for
//! events, run computations responding to those events, repeat. Tokio provides
//! the higher-level async/await slash `Future` abstraction on top of Mio, as
//! well as the runtime to execute those `Future`s. Essentially, the Tokio
//! async runtime essentially replaces the low-level event loop a user might
//! handwrite when using Mio.
//!
//! For context, both Mio and Tokio provide socket/UDP/TCP-level network
//! abstractions, which is probably the right layer for us. There are also
//! libraries built on top of Tokio providing nice server/client HTTP APIs
//! like [Hyper](https://hyper.rs/).
//!
//! The Hydroflow scheduled layer scheduler is essentially the same as a simple
//! event loop: it runs subgraphs when they have data. We have also let it
//! respond to external asynchonous events by providing a threadsafe channel
//! through which subgraphs can be externally scheduled.
//!
//! In order to add networking to Hydroflow, in our current implementation we
//! use Tokio and have a compatibility mechanism for working with `Future`s.
//! A `Future` provides a `Waker` mechanism to notify when it had work to do,
//! so we have hooked these Wakers up with Hydroflow's threadsafe external
//! scheduling channel. This essentially turns Hydroflow into a simple async
//! runtime.
//!
//! However in some situations, we still need to run futures outside of
//! Hydroflow's basic runtime. It's not a goal for Hydroflow to provide all
//! the features of a full runtime like Tokio. Currently for this situation we
//! run Hydroflow as a task (`Future`) within the Tokio runtime. In Hydroflow's
//! event loop we do all available work, then rather than block and wait for
//! external events to schedule more tasks, we temporarily yield back to the
//! Tokio runtime. Tokio will then respond to any outstanding events it has
//! before once again running the Hydroflow scheduler task.
//!
//! This works perfectly well but maybe isn't the best solution long-term.
//! In the future we may want to remove the extra Tokio runtime layer and
//! interface with Mio directly. In this case we would have to do our own
//! socket-style polling within the Hydroflow scheduler's event loop, which
//! would require some extra work and thought. But for now interfacing with
//! Tokio works and I don't think the overhead of the extra runtime loop is
//! significant.
use std::collections::VecDeque;
use std::pin::Pin;
use byteorder::{NetworkEndian, WriteBytesExt};
use futures::{Sink, StreamExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::graph::Dfir;
use super::graph_ext::GraphExt;
use super::handoff::VecHandoff;
use super::port::{RecvPort, SendPort};
pub mod network_vertex;
const ADDRESS_LEN: usize = 4;
// TODO(justin): I don't think we should include the address here, that should
// just be a part of the bytes being sent.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Message {
pub address: u32,
pub batch: bytes::Bytes,
}
impl Message {
fn encode(&self, v: &mut Vec<u8>) {
v.write_u32::<NetworkEndian>(self.address).unwrap();
v.extend(self.batch.iter());
}
pub fn decode(v: bytes::Bytes) -> Self {
let address = u32::from_be_bytes(v[0..ADDRESS_LEN].try_into().unwrap());
let batch = v.slice(ADDRESS_LEN..);
Message { address, batch }
}
}
impl Dfir<'_> {
fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort<VecHandoff<Message>> {
let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
self.add_input_from_stream(
"tcp ingress",
send_port,
reader.map(|buf| Some(<Message>::decode(buf.unwrap().into()))),
);
recv_port
}
fn register_write_tcp_stream(
&mut self,
writer: OwnedWriteHalf,
) -> SendPort<VecHandoff<Message>> {
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
let mut message_queue = VecDeque::new();
let (input_port, output_port) =
self.make_edge::<_, VecHandoff<Message>>("tcp egress handoff");
self.add_subgraph_sink("tcp egress", output_port, move |ctx, recv| {
let waker = ctx.waker();
let mut cx = std::task::Context::from_waker(&waker);
// TODO(mingwei): queue may grow unbounded? Subtle rate matching concern.
// TODO(mingwei): put into state system.
message_queue.extend(recv.take_inner());
while !message_queue.is_empty() {
if let std::task::Poll::Ready(Ok(())) = Pin::new(&mut writer).poll_ready(&mut cx) {
let v = message_queue.pop_front().unwrap();
let mut buf = Vec::new();
v.encode(&mut buf);
Pin::new(&mut writer).start_send(buf.into()).unwrap();
}
}
let _ = Pin::new(&mut writer).poll_flush(&mut cx);
});
input_port
}
pub fn add_write_tcp_stream(&mut self, stream: TcpStream) -> SendPort<VecHandoff<Message>> {
let (_, writer) = stream.into_split();
self.register_write_tcp_stream(writer)
}
pub fn add_read_tcp_stream(&mut self, stream: TcpStream) -> RecvPort<VecHandoff<Message>> {
let (reader, _) = stream.into_split();
self.register_read_tcp_stream(reader)
}
pub fn add_tcp_stream(
&mut self,
stream: TcpStream,
) -> (SendPort<VecHandoff<Message>>, RecvPort<VecHandoff<Message>>) {
let (reader, writer) = stream.into_split();
(
self.register_write_tcp_stream(writer),
self.register_read_tcp_stream(reader),
)
}
}