Skip to main content

Networked Services 2: Chat Server

In this example we cover:

  • Multiple message types and the demux operator.
  • A broadcast pattern via the cross_join operator.
  • One-time bootstrapping pipelines
  • A "gated buffer" using defer_signal and persist operators

Our previous echo server example was admittedly simplistic. In this example, we'll build something a bit more useful: a simple chat server. We will again have two roles: a Client and a Server. Clients will register their presence with the Server, which maintains a list of clients. Each Client sends messages to the Server, which will then broadcast those messages to all other clients.

Project files

Cargo.toml

We will use a text-coloring crate called colored in this example. To follow along, add the following line to the bottom of the Cargo.toml file that appears at that root of your template:

colored = "2.0.0"

main.rs

The main.rs file here is very similar to that of the echo server, just with two new command-line arguments: one called name for a "nickname" in the chatroom, and another optional argument graph for printing a dataflow graph if desired. To follow along, you can copy the contents of this file into the src/main.rs file of your template.

use std::net::SocketAddr;

use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use hydroflow_lang::graph::{WriteConfig, WriteGraphType};
use server::run_server;

mod client;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(long)]
name: String,
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: Option<SocketAddr>,
#[clap(long, value_parser = ipv4_resolve)]
server_addr: Option<SocketAddr>,
#[clap(long)]
graph: Option<WriteGraphType>,
#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
// if no addr was provided, we ask the OS to assign a local port by passing in "localhost:0"
let addr = opts
.addr
.unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap());

// allocate `outbound` sink and `inbound` stream
let (outbound, inbound, addr) = bind_udp_bytes(addr).await;
println!("Listening on {:?}", addr);

match opts.role {
Role::Client => {
run_client(outbound, inbound, opts).await;
}
Role::Server => {
run_server(outbound, inbound, opts).await;
}
}
}

#[test]
fn test() {
use std::io::Write;

use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server, _, mut server_output) =
run_cargo_example("chat", "--role server --name server --addr 127.0.0.1:11247");

let mut server_output_so_far = String::new();
wait_for_process_output(
&mut server_output_so_far,
&mut server_output,
"Server live!",
);

let (_client1, mut client1_input, mut client1_output) = run_cargo_example(
"chat",
"--role client --name client1 --server-addr 127.0.0.1:11247",
);

let (_client2, _, mut client2_output) = run_cargo_example(
"chat",
"--role client --name client2 --server-addr 127.0.0.1:11247",
);

let mut client1_output_so_far = String::new();
let mut client2_output_so_far = String::new();

wait_for_process_output(
&mut client1_output_so_far,
&mut client1_output,
"Client live!",
);
wait_for_process_output(
&mut client2_output_so_far,
&mut client2_output,
"Client live!",
);

// wait 100ms so we don't drop a packet
let hundo_millis = std::time::Duration::from_millis(100);
std::thread::sleep(hundo_millis);

client1_input.write_all(b"Hello\n").unwrap();

wait_for_process_output(
&mut client2_output_so_far,
&mut client2_output,
".*, .* client1: Hello",
);
}

protocol.rs

Our protocol file here expands upon what we saw with the echoserver by defining multiple message types. Replace the template contents of src/protocol.rs with the following:

use std::net::SocketAddr;

use chrono::prelude::*;
use hydroflow_macro::DemuxEnum;
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, DemuxEnum)]
pub enum Message {
ConnectRequest,
ConnectResponse,
ChatMsg {
nickname: String,
message: String,
ts: DateTime<Utc>,
},
}

#[derive(Clone, Debug, DemuxEnum)]
pub enum MessageWithAddr {
ConnectRequest {
addr: SocketAddr,
},
ConnectResponse {
addr: SocketAddr,
},
ChatMsg {
addr: SocketAddr,
nickname: String,
message: String,
ts: DateTime<Utc>,
},
}
impl MessageWithAddr {
pub fn from_message(message: Message, addr: SocketAddr) -> Self {
match message {
Message::ConnectRequest => Self::ConnectRequest { addr },
Message::ConnectResponse => Self::ConnectResponse { addr },
Message::ChatMsg {
nickname,
message,
ts,
} => Self::ChatMsg {
addr,
nickname,
message,
ts,
},
}
}
}

Note how we use a single Rust enum to represent all varieties of message types; this allows us to handle Messages of different types with a single Rust network channel. We will use the demux operator to separate out these different message types on the receiving end.

The ConnectRequest and ConnectResponse messages have no payload; the address of the sender and the type of the message will be sufficient information. The ChatMsg message type has a nickname field, a message field, and a ts field for the timestamp. Once again we use the chrono crate to represent timestamps.

server.rs

The chat server is nearly as simple as the echo server. The main differences are (a) we need to handle multiple message types, (b) we need to keep track of the list of clients, and (c) we need to broadcast messages to all clients.

To follow along, replace the contents of src/server.rs with the code below:

use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream};

use crate::protocol::{Message, MessageWithAddr};
use crate::Opts;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Server live!");

let mut hf: Hydroflow = hydroflow_syntax! {
// Define shared inbound and outbound channels
outbound_chan = union() -> dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound)
-> map(Result::unwrap)
-> map(|(msg, addr)| MessageWithAddr::from_message(msg, addr))
-> demux_enum::<MessageWithAddr>();
clients = inbound_chan[ConnectRequest] -> map(|(addr,)| addr) -> tee();
inbound_chan[ConnectResponse] -> for_each(|(addr,)| println!("Received unexpected `ConnectResponse` as server from addr {}.", addr));

// Pipeline 1: Acknowledge client connections
clients[0] -> map(|addr| (Message::ConnectResponse, addr)) -> [0]outbound_chan;

// Pipeline 2: Broadcast messages to all clients

After a short prelude, we have the Hydroflow code near the top of run_server(). It begins by defining outbound_chan as a unioned destination sink for network messages. Then we get to the more interesting inbound_chan definition.

The inbound channel is a source stream that will carry many types of Messages. We first use a map operator to unwrap the Rust Result type that comes from deserializing the input from source_stream_serde. Then we use the demux operator to partition the stream objects into three channels. The clients channel will carry the addresses of clients that have connected to the server. The msgs channel will carry the ChatMsg messages that clients send to the server. The errs channel will carry any other messages that clients send to the server.

Note the structure of the demux operator: it takes a closure on (Message, SocketAddr) pairs, and a variadic tuple (var_args!) of the output channel names—in this case clients, msgs, and errs. The closure is basically a big Rust pattern match, with one arm for each output channel name given in the variadic tuple. Note that each output channel can have its own message type! Note also that we destructure the incoming Message types into component fields. (If we didn't we'd have to write boilerplate code to handle every possible Message type in every downstream pipeline!)

The remainder of the server consists of two independent pipelines, the code to print out the flow graph, and the code to run the flow graph. To follow along, paste the following into the bottom of your src/server.rs file:

        clients[1] -> [1]broadcast;
broadcast = cross_join::<'tick, 'static>() -> [1]outbound_chan;
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}

The first pipeline is one line long, and is responsible for acknowledging requests from clients: it takes the address of the incoming Message::ConnectRequest and sends a ConnectResponse back to that address. The second pipeline is responsible for broadcasting all chat messages to all clients. This all-to-all pairing corresponds to the notion of a cartesian product or cross_join in Hydroflow. The cross_join operator takes two input channels and produces a single output channel with a tuple for each pair of inputs, in this case it produces (Message, SocketAddr) pairs. Conveniently, that is exactly the structure needed for sending to the outbound_chan sink! We call the cross-join pipeline broadcast because it effectively broadcasts all messages to all clients.

The mermaid graph for the server is below. The three branches of the demux are very clear toward the top. Note also the tee of the clients channel for both ClientResponse and broadcasting, and the union of all outbound messages into dest_sink_serde.

client.rs

The chat client is not very different from the echo server client, with two new design patterns:

  1. a initialize operator that runs once to "bootstrap" action in the first tick
  2. the use of defer_signal and persist as a "gated buffer" to postpone sending messages.

We also include a Rust helper routine pretty_print_msg for formatting output.

The prelude of the file is almost the same as the echo server client, with the addition of the crate for handling colored text output. This is followed by the pretty_print_msg function, which is fairly self-explanatory. To follow along, start by replacing the contents of src/client.rs with the following:

use chrono::prelude::*;
use colored::Colorize;
use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};

use crate::protocol::Message;
use crate::Opts;

fn pretty_print_msg(nickname: String, message: String, ts: DateTime<Utc>) {
println!(
"{} {}: {}",
ts.with_timezone(&Local)
.format("%b %-d, %-I:%M:%S")
.to_string()
.truecolor(126, 126, 126)
.italic(),
nickname.green().italic(),
message,
);
}

pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
// server_addr is required for client
let server_addr = opts.server_addr.expect("Client requires a server address");
println!("Client live!");

let mut hf = hydroflow_syntax! {

This brings us to the run_client function. As in run_server we begin by ensuring the server address is supplied. We then have the hydroflow code starting with a standard pattern of a unioned outbound_chan, and a demuxed inbound_chan. The client handles only two inbound Message types: Message::ConnectResponse and Message::ChatMsg.

Paste the following to the bottom of src/client.rs:

        outbound_chan = union() -> dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound)
-> map(Result::unwrap)
-> map(|(msg, _addr)| msg)
-> demux_enum::<Message>();
inbound_chan[ConnectRequest] -> for_each(|()| println!("Received unexpected connect request from server."));

// send a single connection request on startup
initialize() -> map(|_m| (Message::ConnectRequest, server_addr)) -> [0]outbound_chan;

// take stdin and send to server as a msg
// the batch serves to buffer msgs until the connection request is acked
lines = source_stdin()
-> map(|l| Message::ChatMsg {
nickname: opts.name.clone(),
message: l.unwrap(),
ts: Utc::now()})

The core logic of the client consists of three dataflow pipelines shown below. Paste this into the bottom of your src/client.rs file.

        inbound_chan[ConnectResponse] -> persist() -> [signal]msg_send;
msg_send = defer_signal() -> map(|msg| (msg, server_addr)) -> [1]outbound_chan;

// receive and print messages
inbound_chan[ChatMsg] -> for_each(|(nick, msg, ts)| pretty_print_msg(nick, msg, ts));
};

// optionally print the dataflow graph
if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
  1. The first pipeline is the "bootstrap" alluded to above. It starts with the initialize operator that emits a single, opaque "unit" (()) value. This value is emitted when the client begins, which means this pipeline runs once, immediately on startup, and generates a single ConnectRequest message which is sent to the server.

  2. The second pipeline reads from source_stdin and sends messages to the server. It differs from our echo-server example in the use of the defer_signal operator, which buffers up messages until a ConnectResponse is received. The flow assigned to the lines variable takes chat messages from stdin and passes them to the [input] channel of the defer_signal. The defer_signal operator buffers these messages until it gets an input on its [signal] channel. Then all [input] data buffered from previous ticks is passed along to the output, along with any data that streams in during the current tick. In our chat example, we want messages to be sent to the server in all subsequent ticks after ConnectResponse is received! To enforce this, we need to send something on the [signal] channel of defer_signal every subsequent tick. We achieve this by interposing a persist between inbound_chan[acks] and [signal]msg_send. The persist operator stores its input data in order across time, and replays its current contents each tick. In this case it is storing ConnectResponse messages, of which we expect only one. The persist op will replay this signal every tick after it is received, so the client will always send its messages to the server once connected.

  3. The final pipeline simply pretty-prints the messages received from the server.

Finish up the file by pasting the code below for optionally generating the graph and running the flow:


The client's mermaid graph looks a bit different than the server's, mostly because it routes some data to the screen rather than to an outbound network channel.

Running the example

As described in hydroflow/hydroflow/example/chat/README.md, we can run the server in one terminal, and run clients in additional terminals. The server's addr and the client's server-addr need to agree or this won't work!

Fire up the server in terminal 1:

cargo run -- --name "_" --role server --addr 127.0.0.1:12347

Start client "alice" in terminal 2 and type some messages, and you'll see them echoed back to you. This will appear in colored fonts in most terminals (but unfortunately not in this markdown-based book!)

cargo run -- --name "alice" --role client --server-addr 127.0.0.1:12347
Listening on 127.0.0.1:50460
Client live!
Hello (hello hello) ... is there anybody in here?
May 31, 5:12:23 alice: Hello (hello hello) ... is there anybody in here?
Just nod if you can hear me.
May 31, 5:12:36 alice: Just nod if you can hear me.
Is there anyone home?
May 31, 5:12:40 alice: Is there anyone home?

Now start client "bob" in terminal 3, and notice how he instantly receives the backlog of Alice's messages from the server's cross_join. (The messages may not be printed in the same order as they were timestamped! The cross_join operator is not guaranteed to preserve order, nor is the udp network. Fixing these issues requires extra client logic (perhaps using the sort() operator) that we leave as an exercise to the reader.)

cargo run -- --name "bob" --role client --server-addr 127.0.0.1:12347
Listening on 127.0.0.1:49298
Client live!
May 31, 5:12:23 alice: Hello (hello hello) ... is there anybody in here?
May 31, 5:12:36 alice: Just nod if you can hear me.
May 31, 5:12:40 alice: Is there anyone home?

Now in terminal 3, Bob can respond:

*nods*
May 31, 5:13:43 bob: *nods*

and if we go back to terminal 2 we can see that Alice gets the message too:

May 31, 5:13:43 bob: *nods*