dfir_rs/util/simulation.rs
1//! # Hydro/DFIR Deterministic Simulation Testing Framework
2//!
3//! This module provides a deterministic simulation testing framework for testing Hydro/DFIR
4//! processes.
5//!
6//! It can be used to test complex interactions between multiple Hydro/DFIR processes in a
7//! deterministic manner by running them in a single-threaded environment. The framework also
8//! provides a "virtual network" implementation that allows production processes to exchange
9//! messages within the simulation. More importantly, the network is fully under control of the
10//! unit test and the test can introduce faults such as message delays, message drops and
11//! network partitions.
12//!
13//! ## Overview
14//!
15//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
16//! aren't real hosts, but rather a collection of individual Hydro/DFIR processes (one per host)
17//! that can communicate with each other over a virtual network. Every host has a "hostname"
18//! which uniquely identifies it within the fleet.
19//!
20//! ```text
21//! ┌───────────────────────────────────────────────────────────────────────────────────────────┐
22//! │SIMULATION │
23//! │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
24//! │ │FLEET │ │
25//! │ │ ┌───────────────────────────────┐ ┌───────────────────────────────┐ │ │
26//! │ │ │HOST │ │HOST │ │ │
27//! │ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │
28//! │ │ │ │INBOX │ │INBOX │ │INBOX │ │ ┌-┼-►INBOX │ │INBOX │ │INBOX │ │ │ │
29//! │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │
30//! │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
31//! │ │ │ │ │ │ │ │ │ │ │ │ │
32//! │ │ │ │ TRANSDUCER │ │ │ │ │ TRANSDUCER │ │ │ │
33//! │ │ │ │ │ │ │ │ │ │ │ │ │
34//! │ │ │ └───┬─────────┬──────────┬──┘ │ │ │ └───┬─────────┬─────────┬───┘ │ │ │
35//! │ │ │ ┌──▼───┐ ┌──▼───┐ ┌───▼──┐ │ │ │ ┌──▼───┐ ┌──▼───┐ ┌──▼───┐ │ │ │
36//! │ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX┼-┼--┐ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX│ │ │ │
37//! │ │ │ └──────┘ └──────┘ └──────┘ │ │ │ │ └──────┘ └──────┘ └──────┘ │ │ │
38//! │ │ └───────────────────────────────┘ │ │ └───────────────────────────────┘ │ │
39//! │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
40//! │ ┌─┼──────────────┼─┐ │
41//! │ │ └--------------┘ │ │
42//! │ │ NETWORK MESSAGE │ │
43//! │ │ PROCESSING │ │
44//! │ └──────────────────┘ │
45//! └───────────────────────────────────────────────────────────────────────────────────────────┘
46//! ```
47//! ## Network Processing
48//!
49//! ### Outboxes & Inboxes
50//! When a process wishes to send a message to another process, it sends the message to an
51//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
52//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
53//! corresponding inboxes on the destination hosts. The network message processing logic is the
54//! point at which failures can be injected to change the behavior of the network.
55//!
56//! ### Interface Names
57//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
58//! identifies the interface on the host. When a process sends a message, it specifies the
59//! destination hostname and the interface name on that host to which the message should be
60//! delivered.
61//!
62//! ## Progress of Time in the Simulation
63//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
64//! method on the host. This ultimately runs a single tick on the process. The unit test is
65//! also responsible for invoking the network message processing at the time of its choosing and
66//! can interleave the progress of time on various hosts and network processing as it sees fit.
67//!
68//! ## Examples
69//! Check the tests module for examples on how to use the simulation framework.
70use std::any::Any;
71use std::collections::HashMap;
72use std::convert::Infallible;
73use std::fmt::Debug;
74use std::future::ready;
75use std::pin::Pin;
76
77use futures::{Sink, SinkExt, StreamExt, sink};
78use serde::{Deserialize, Serialize};
79use tokio::sync::mpsc::UnboundedSender;
80use tokio_stream::Stream;
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::trace;
83
84use crate::scheduled::graph::Dfir;
85use crate::util::{collect_ready_async, unbounded_channel};
86
87/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
88/// to a specific host (and thus a specific Hydro/DFIR process).
89pub type Hostname = String;
90
91/// An interface name is a unique identifier for an inbox or an outbox on host.
92type InterfaceName = String;
93
94/// An address is a combination of a hostname and an interface name.
95#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
96pub struct Address {
97 host: Hostname,
98 interface: InterfaceName,
99}
100
101impl Address {
102 /// Create a new address with the given hostname and interface name.
103 pub fn new(host: Hostname, interface: InterfaceName) -> Self {
104 Address { host, interface }
105 }
106}
107
108/// A message sender is used to send messages to an inbox on a host.
109pub trait MessageSender {
110 /// Send a message to the inbox on the host.
111 fn send(&self, message: MessageWithAddress);
112}
113
114impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
115 fn send(&self, message: (Box<dyn Any>, Address)) {
116 match message.0.downcast::<T>() {
117 Ok(msg) => {
118 self.send((*msg, message.1)).unwrap();
119 }
120 Err(e) => {
121 panic!("Failed to downcast message to expected type: {:?}", e);
122 }
123 }
124 }
125}
126
127/// A message with an delivery address.
128pub type MessageWithAddress = (Box<dyn Any>, Address);
129
130/// An inbox is used by a host to receive messages for the process.
131pub struct Inbox {
132 sender: Box<dyn MessageSender>,
133}
134
135/// Processes can send messages to other processes by putting those messages in an outbox
136/// on their host.
137pub struct Outbox {
138 receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
139}
140
141/// A host is a single Hydro/DFIR process running in the simulation. It has a unique hostname
142/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
143/// and outboxes.
144pub struct Host {
145 name: Hostname,
146 process: Dfir<'static>,
147 inputs: HashMap<InterfaceName, Inbox>,
148 output: HashMap<InterfaceName, Outbox>,
149}
150
151impl Host {
152 /// Run a single tick on the host's process. Returns true if any work was done by the
153 /// process. This effectively "advances" time on the process.
154 pub fn run_tick(&mut self) -> bool {
155 self.process.run_tick()
156 }
157}
158
159/// A builder for constructing a host in the simulation.
160pub struct HostBuilder {
161 name: Hostname,
162 process: Option<Dfir<'static>>,
163 inboxes: HashMap<InterfaceName, Inbox>,
164 outboxes: HashMap<InterfaceName, Outbox>,
165}
166
167/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
168pub struct ProcessBuilderContext<'context> {
169 inboxes: &'context mut HashMap<InterfaceName, Inbox>,
170 outboxes: &'context mut HashMap<InterfaceName, Outbox>,
171}
172
173fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
174 sink::drain().with(move |item| {
175 (f)(item);
176 ready(Result::<(), Infallible>::Ok(()))
177 })
178}
179
180impl ProcessBuilderContext<'_> {
181 /// Create a new inbox on the host with the given interface name. Returns a stream that can
182 /// be read by the process using the source_stream dfir operator.
183 pub fn new_inbox<T: 'static>(
184 &mut self,
185 interface: InterfaceName,
186 ) -> UnboundedReceiverStream<(T, Address)> {
187 let (sender, receiver) = unbounded_channel::<(T, Address)>();
188 self.inboxes.insert(
189 interface,
190 Inbox {
191 sender: Box::new(sender),
192 },
193 );
194 receiver
195 }
196
197 /// Creates a new outbox on the host with the given interface name. Returns a sink that can
198 /// be written to by the process using the dest_sink dfir operator.
199 pub fn new_outbox<T: 'static>(
200 &mut self,
201 interface: InterfaceName,
202 ) -> impl use<T> + Sink<(T, Address), Error = Infallible> {
203 let (sender, receiver) = unbounded_channel::<(T, Address)>();
204
205 let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
206
207 self.outboxes.insert(
208 interface,
209 Outbox {
210 receiver: Box::pin(receiver),
211 },
212 );
213
214 sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
215 }
216}
217
218impl HostBuilder {
219 /// Creates a new instance of HostBuilder for a given hostname,
220 pub fn new(name: Hostname) -> Self {
221 HostBuilder {
222 name,
223 process: None,
224 inboxes: Default::default(),
225 outboxes: Default::default(),
226 }
227 }
228
229 /// Supplies the (mandatory) process that runs on this host.
230 pub fn with_process<F>(mut self, builder: F) -> Self
231 where
232 F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
233 {
234 let mut context = ProcessBuilderContext {
235 inboxes: &mut self.inboxes,
236 outboxes: &mut self.outboxes,
237 };
238 let process = builder(&mut context);
239 self.process = Some(process);
240 self
241 }
242
243 /// Builds the host with the supplied configuration.
244 pub fn build(self) -> Host {
245 if self.process.is_none() {
246 panic!("Process is required to build a host");
247 }
248
249 Host {
250 name: self.name,
251 process: self.process.unwrap(),
252 inputs: self.inboxes,
253 output: self.outboxes,
254 }
255 }
256}
257
258/// A fleet is a collection of hosts in the simulation. It is responsible for running the
259/// simulation and processing network messages.
260pub struct Fleet {
261 hosts: HashMap<String, Host>,
262}
263
264impl Fleet {
265 /// Creates a new instance of Fleet.
266 pub fn new() -> Self {
267 Fleet {
268 hosts: HashMap::new(),
269 }
270 }
271
272 /// Adds a new host to the fleet with the given name and process.
273 pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
274 where
275 F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
276 {
277 let host = HostBuilder::new(name.clone())
278 .with_process(process_builder)
279 .build();
280 assert!(
281 self.hosts.insert(host.name.clone(), host).is_none(),
282 "Host with name {} already exists",
283 name
284 );
285 self.get_host(&name).unwrap()
286 }
287
288 /// Get a host by name.
289 pub fn get_host(&self, name: &str) -> Option<&Host> {
290 self.hosts.get(name)
291 }
292
293 /// Get a host by name.
294 pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
295 self.hosts.get_mut(name)
296 }
297
298 /// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
299 /// hosts. After ticking once on all the hosts, the method also processes network messages.
300 ///
301 /// The order in which the ticks are processed is not guaranteed.
302 pub async fn run_single_tick_all_hosts(&mut self) -> bool {
303 let mut work_done: bool = false;
304
305 for (name, host) in self.hosts.iter_mut() {
306 trace!("Running tick for host: {}", name);
307 work_done |= host.run_tick();
308 }
309
310 self.process_network().await;
311
312 work_done
313 }
314
315 /// Process all network messages in the simulation. This method picks up all messages from all
316 /// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
317 ///
318 /// The order in which the messages are processed is not guaranteed.
319 pub async fn process_network(&mut self) {
320 let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
321
322 // Collect all messages from all outboxes on all hosts.
323 for (name, host) in self.hosts.iter_mut() {
324 for (interface, output) in host.output.iter_mut() {
325 let src_address = Address::new(name.clone(), interface.clone());
326 let all_messages_on_interface: Vec<_> =
327 collect_ready_async(&mut output.receiver).await;
328 for message_on_interface in all_messages_on_interface {
329 all_messages.push((src_address.clone(), message_on_interface));
330 }
331 }
332 }
333
334 // Deliver all messages to the corresponding inboxes on the destination hosts.
335 for (src_address, (msg, addr)) in all_messages {
336 if let Some(destination_host) = self.hosts.get(&addr.host) {
337 if let Some(input) = destination_host.inputs.get(&addr.interface) {
338 input.sender.send((msg, src_address.clone()));
339 } else {
340 trace!(
341 "No interface named {:?} found on host {:?}. Dropping message {:?}.",
342 addr.interface, addr.host, msg
343 );
344 }
345 } else {
346 trace!(
347 "No host named {:?} found. Dropping message {:?}.",
348 addr.host, msg
349 );
350 }
351 }
352 }
353
354 /// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
355 /// is done in "rounds". At each round, all hosts are ticked once and then network messages are
356 /// processed. The process continues until no work is done by any host in a round.
357 pub async fn run_until_quiescent(&mut self) {
358 while self.run_single_tick_all_hosts().await {}
359 }
360}
361
362impl Default for Fleet {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use dfir_macro::{dfir_syntax, dfir_test};
371 use futures::StreamExt;
372
373 use crate::util::simulation::{Address, Fleet, Hostname};
374 use crate::util::unbounded_channel;
375
376 /// A simple test to demonstrate use of the simulation framework. Implements an echo server
377 /// and client.
378 #[dfir_test]
379 async fn test_echo() {
380 let mut fleet = Fleet::new();
381
382 // Hostnames for the server and client
383 let server: Hostname = "server".to_string();
384 let client: Hostname = "client".to_string();
385
386 // Interface name for the echo "protocol"
387 let interface: String = "echo".to_string();
388
389 let server_address = Address::new(server.clone(), interface.clone());
390
391 // Create the echo server
392 fleet.add_host(server.clone(), |ctx| {
393 let network_input = ctx.new_inbox::<String>(interface.clone());
394 let network_output = ctx.new_outbox::<String>(interface.clone());
395 dfir_syntax! {
396 out = dest_sink(network_output);
397
398 source_stream(network_input)
399 -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
400 -> out;
401 }
402 });
403
404 // The client trigger channel is used to trigger the client into sending a message to the
405 // server. This allows the unit test to control when the client sends a message.
406 let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
407 let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
408
409 fleet.add_host(client.clone(), |ctx| {
410 let network_out = ctx.new_outbox::<String>(interface.clone());
411 let network_in = ctx.new_inbox::<String>(interface.clone());
412
413 dfir_syntax! {
414 out = dest_sink(network_out);
415
416 source_stream(client_trigger_rx)
417 -> map(|msg| (msg, server_address.clone()))
418 -> out;
419
420 source_stream(network_in)
421 -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
422 -> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
423
424 }
425 });
426
427 // Trigger the client to send a message.
428 client_trigger_tx.send("Hello, world!".to_string()).unwrap();
429
430 // Run the simulation until no new work is done by any host.
431 fleet.run_until_quiescent().await;
432
433 // Check that the message was received.
434 let response = client_response_rx.next().await.unwrap();
435 assert_eq!(response, "Hello, world!");
436 }
437}