dfir_rs/util/simulation.rs
1//! # Hydro/DFIR Deterministic Simulation Testing Framework
2//!
3//! This module provides a deterministic simulation testing framework for testing Hydro/DFIR
4//! processes.
5//!
6//! It can be used to test complex interactions between multiple Hydro/DFIR processes in a
7//! deterministic manner by running them in a single-threaded environment. The framework also
8//! provides a "virtual network" implementation that allows production processes to exchange
9//! messages within the simulation. More importantly, the network is fully under control of the
10//! unit test and the test can introduce faults such as message delays, message drops and
11//! network partitions.
12//!
13//! ## Overview
14//!
15//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
16//! aren't real hosts, but rather a collection of individual Hydro/DFIR processes (one per host)
17//! that can communicate with each other over a virtual network. Every host has a "hostname"
18//! which uniquely identifies it within the fleet.
19//!
20//! ```text
21//! ┌───────────────────────────────────────────────────────────────────────────────────────────┐
22//! │SIMULATION │
23//! │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
24//! │ │FLEET │ │
25//! │ │ ┌───────────────────────────────┐ ┌───────────────────────────────┐ │ │
26//! │ │ │HOST │ │HOST │ │ │
27//! │ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │
28//! │ │ │ │INBOX │ │INBOX │ │INBOX │ │ ┌-┼-►INBOX │ │INBOX │ │INBOX │ │ │ │
29//! │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │
30//! │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
31//! │ │ │ │ │ │ │ │ │ │ │ │ │
32//! │ │ │ │ TRANSDUCER │ │ │ │ │ TRANSDUCER │ │ │ │
33//! │ │ │ │ │ │ │ │ │ │ │ │ │
34//! │ │ │ └───┬─────────┬──────────┬──┘ │ │ │ └───┬─────────┬─────────┬───┘ │ │ │
35//! │ │ │ ┌──▼───┐ ┌──▼───┐ ┌───▼──┐ │ │ │ ┌──▼───┐ ┌──▼───┐ ┌──▼───┐ │ │ │
36//! │ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX┼-┼--┐ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX│ │ │ │
37//! │ │ │ └──────┘ └──────┘ └──────┘ │ │ │ │ └──────┘ └──────┘ └──────┘ │ │ │
38//! │ │ └───────────────────────────────┘ │ │ └───────────────────────────────┘ │ │
39//! │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
40//! │ ┌─┼──────────────┼─┐ │
41//! │ │ └--------------┘ │ │
42//! │ │ NETWORK MESSAGE │ │
43//! │ │ PROCESSING │ │
44//! │ └──────────────────┘ │
45//! └───────────────────────────────────────────────────────────────────────────────────────────┘
46//! ```
47//! ## Network Processing
48//!
49//! ### Outboxes & Inboxes
50//! When a process wishes to send a message to another process, it sends the message to an
51//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
52//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
53//! corresponding inboxes on the destination hosts. The network message processing logic is the
54//! point at which failures can be injected to change the behavior of the network.
55//!
56//! ### Interface Names
57//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
58//! identifies the interface on the host. When a process sends a message, it specifies the
59//! destination hostname and the interface name on that host to which the message should be
60//! delivered.
61//!
62//! ## Progress of Time in the Simulation
63//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
64//! method on the host. This ultimately runs a single tick on the process. The unit test is
65//! also responsible for invoking the network message processing at the time of its choosing and
66//! can interleave the progress of time on various hosts and network processing as it sees fit.
67//!
68//! ## Examples
69//! Check the tests module for examples on how to use the simulation framework.
70use std::any::Any;
71use std::collections::HashMap;
72use std::fmt::Debug;
73use std::pin::Pin;
74
75use futures::{Sink, StreamExt};
76use serde::{Deserialize, Serialize};
77use tokio::sync::mpsc::UnboundedSender;
78use tokio_stream::Stream;
79use tokio_stream::wrappers::UnboundedReceiverStream;
80use tracing::trace;
81
82use crate::scheduled::graph::Dfir;
83use crate::util::{collect_ready_async, unbounded_channel};
84
85/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
86/// to a specific host (and thus a specific Hydro/DFIR process).
87pub type Hostname = String;
88
89/// An interface name is a unique identifier for an inbox or an outbox on host.
90type InterfaceName = String;
91
92/// An address is a combination of a hostname and an interface name.
93#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
94pub struct Address {
95 host: Hostname,
96 interface: InterfaceName,
97}
98
99impl Address {
100 /// Create a new address with the given hostname and interface name.
101 pub fn new(host: Hostname, interface: InterfaceName) -> Self {
102 Address { host, interface }
103 }
104}
105
106/// A message sender is used to send messages to an inbox on a host.
107pub trait MessageSender {
108 /// Send a message to the inbox on the host.
109 fn send(&self, message: MessageWithAddress);
110}
111
112impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
113 fn send(&self, message: (Box<dyn Any>, Address)) {
114 match message.0.downcast::<T>() {
115 Ok(msg) => {
116 self.send((*msg, message.1)).unwrap();
117 }
118 Err(e) => {
119 panic!("Failed to downcast message to expected type: {:?}", e);
120 }
121 }
122 }
123}
124
125/// A message with an delivery address.
126pub type MessageWithAddress = (Box<dyn Any>, Address);
127
128/// An inbox is used by a host to receive messages for the process.
129pub struct Inbox {
130 sender: Box<dyn MessageSender>,
131}
132
133/// Processes can send messages to other processes by putting those messages in an outbox
134/// on their host.
135pub struct Outbox {
136 receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
137}
138
139/// A host is a single Hydro/DFIR process running in the simulation. It has a unique hostname
140/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
141/// and outboxes.
142pub struct Host {
143 name: Hostname,
144 process: Dfir<'static>,
145 inputs: HashMap<InterfaceName, Inbox>,
146 output: HashMap<InterfaceName, Outbox>,
147}
148
149impl Host {
150 /// Run a single tick on the host's process. Returns true if any work was done by the
151 /// process. This effectively "advances" time on the process.
152 pub async fn run_tick(&mut self) -> bool {
153 self.process.run_tick().await
154 }
155}
156
157/// A builder for constructing a host in the simulation.
158pub struct HostBuilder {
159 name: Hostname,
160 process: Option<Dfir<'static>>,
161 inboxes: HashMap<InterfaceName, Inbox>,
162 outboxes: HashMap<InterfaceName, Outbox>,
163}
164
165/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
166pub struct ProcessBuilderContext<'context> {
167 inboxes: &'context mut HashMap<InterfaceName, Inbox>,
168 outboxes: &'context mut HashMap<InterfaceName, Outbox>,
169}
170
171fn sink_from_fn<T>(f: impl FnMut(T)) -> impl Sink<T, Error = crate::Never> {
172 sinktools::for_each(f)
173}
174
175impl ProcessBuilderContext<'_> {
176 /// Create a new inbox on the host with the given interface name. Returns a stream that can
177 /// be read by the process using the source_stream dfir operator.
178 pub fn new_inbox<T: 'static>(
179 &mut self,
180 interface: InterfaceName,
181 ) -> UnboundedReceiverStream<(T, Address)> {
182 let (sender, receiver) = unbounded_channel::<(T, Address)>();
183 self.inboxes.insert(
184 interface,
185 Inbox {
186 sender: Box::new(sender),
187 },
188 );
189 receiver
190 }
191
192 /// Creates a new outbox on the host with the given interface name. Returns a sink that can
193 /// be written to by the process using the dest_sink dfir operator.
194 pub fn new_outbox<T: 'static>(
195 &mut self,
196 interface: InterfaceName,
197 ) -> impl use<T> + Sink<(T, Address), Error = crate::Never> {
198 let (sender, receiver) = unbounded_channel::<(T, Address)>();
199
200 let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
201
202 self.outboxes.insert(
203 interface,
204 Outbox {
205 receiver: Box::pin(receiver),
206 },
207 );
208
209 sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
210 }
211}
212
213impl HostBuilder {
214 /// Creates a new instance of HostBuilder for a given hostname,
215 pub fn new(name: Hostname) -> Self {
216 HostBuilder {
217 name,
218 process: None,
219 inboxes: Default::default(),
220 outboxes: Default::default(),
221 }
222 }
223
224 /// Supplies the (mandatory) process that runs on this host.
225 pub fn with_process<F>(mut self, builder: F) -> Self
226 where
227 F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
228 {
229 let mut context = ProcessBuilderContext {
230 inboxes: &mut self.inboxes,
231 outboxes: &mut self.outboxes,
232 };
233 let process = builder(&mut context);
234 self.process = Some(process);
235 self
236 }
237
238 /// Builds the host with the supplied configuration.
239 pub fn build(self) -> Host {
240 if self.process.is_none() {
241 panic!("Process is required to build a host");
242 }
243
244 Host {
245 name: self.name,
246 process: self.process.unwrap(),
247 inputs: self.inboxes,
248 output: self.outboxes,
249 }
250 }
251}
252
253/// A fleet is a collection of hosts in the simulation. It is responsible for running the
254/// simulation and processing network messages.
255pub struct Fleet {
256 hosts: HashMap<String, Host>,
257}
258
259impl Fleet {
260 /// Creates a new instance of Fleet.
261 pub fn new() -> Self {
262 Fleet {
263 hosts: HashMap::new(),
264 }
265 }
266
267 /// Adds a new host to the fleet with the given name and process.
268 pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
269 where
270 F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
271 {
272 let host = HostBuilder::new(name.clone())
273 .with_process(process_builder)
274 .build();
275 assert!(
276 self.hosts.insert(host.name.clone(), host).is_none(),
277 "Host with name {} already exists",
278 name
279 );
280 self.get_host(&name).unwrap()
281 }
282
283 /// Get a host by name.
284 pub fn get_host(&self, name: &str) -> Option<&Host> {
285 self.hosts.get(name)
286 }
287
288 /// Get a host by name.
289 pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
290 self.hosts.get_mut(name)
291 }
292
293 /// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
294 /// hosts. After ticking once on all the hosts, the method also processes network messages.
295 ///
296 /// The order in which the ticks are processed is not guaranteed.
297 pub async fn run_single_tick_all_hosts(&mut self) -> bool {
298 let mut work_done: bool = false;
299
300 for (name, host) in self.hosts.iter_mut() {
301 trace!("Running tick for host: {}", name);
302 work_done |= host.run_tick().await;
303 }
304
305 self.process_network().await;
306
307 work_done
308 }
309
310 /// Process all network messages in the simulation. This method picks up all messages from all
311 /// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
312 ///
313 /// The order in which the messages are processed is not guaranteed.
314 pub async fn process_network(&mut self) {
315 let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
316
317 // Collect all messages from all outboxes on all hosts.
318 for (name, host) in self.hosts.iter_mut() {
319 for (interface, output) in host.output.iter_mut() {
320 let src_address = Address::new(name.clone(), interface.clone());
321 let all_messages_on_interface: Vec<_> =
322 collect_ready_async(&mut output.receiver).await;
323 for message_on_interface in all_messages_on_interface {
324 all_messages.push((src_address.clone(), message_on_interface));
325 }
326 }
327 }
328
329 // Deliver all messages to the corresponding inboxes on the destination hosts.
330 for (src_address, (msg, addr)) in all_messages {
331 if let Some(destination_host) = self.hosts.get(&addr.host) {
332 if let Some(input) = destination_host.inputs.get(&addr.interface) {
333 input.sender.send((msg, src_address.clone()));
334 } else {
335 trace!(
336 "No interface named {:?} found on host {:?}. Dropping message {:?}.",
337 addr.interface, addr.host, msg
338 );
339 }
340 } else {
341 trace!(
342 "No host named {:?} found. Dropping message {:?}.",
343 addr.host, msg
344 );
345 }
346 }
347 }
348
349 /// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
350 /// is done in "rounds". At each round, all hosts are ticked once and then network messages are
351 /// processed. The process continues until no work is done by any host in a round.
352 pub async fn run_until_quiescent(&mut self) {
353 while self.run_single_tick_all_hosts().await {}
354 }
355}
356
357impl Default for Fleet {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use dfir_macro::{dfir_syntax, dfir_test};
366 use futures::StreamExt;
367
368 use crate::util::simulation::{Address, Fleet, Hostname};
369 use crate::util::unbounded_channel;
370
371 /// A simple test to demonstrate use of the simulation framework. Implements an echo server
372 /// and client.
373 #[dfir_test]
374 async fn test_echo() {
375 let mut fleet = Fleet::new();
376
377 // Hostnames for the server and client
378 let server: Hostname = "server".to_string();
379 let client: Hostname = "client".to_string();
380
381 // Interface name for the echo "protocol"
382 let interface: String = "echo".to_string();
383
384 let server_address = Address::new(server.clone(), interface.clone());
385
386 // Create the echo server
387 fleet.add_host(server.clone(), |ctx| {
388 let network_input = ctx.new_inbox::<String>(interface.clone());
389 let network_output = ctx.new_outbox::<String>(interface.clone());
390 dfir_syntax! {
391 out = dest_sink(network_output);
392
393 source_stream(network_input)
394 -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
395 -> out;
396 }
397 });
398
399 // The client trigger channel is used to trigger the client into sending a message to the
400 // server. This allows the unit test to control when the client sends a message.
401 let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
402 let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
403
404 fleet.add_host(client.clone(), |ctx| {
405 let network_out = ctx.new_outbox::<String>(interface.clone());
406 let network_in = ctx.new_inbox::<String>(interface.clone());
407
408 dfir_syntax! {
409 out = dest_sink(network_out);
410
411 source_stream(client_trigger_rx)
412 -> map(|msg| (msg, server_address.clone()))
413 -> out;
414
415 source_stream(network_in)
416 -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
417 -> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
418
419 }
420 });
421
422 // Trigger the client to send a message.
423 client_trigger_tx.send("Hello, world!".to_string()).unwrap();
424
425 // Run the simulation until no new work is done by any host.
426 fleet.run_until_quiescent().await;
427
428 // Check that the message was received.
429 let response = client_response_rx.next().await.unwrap();
430 assert_eq!(response, "Hello, world!");
431 }
432}