dfir_rs/util/
simulation.rs

1//! # Hydro/DFIR Deterministic Simulation Testing Framework
2//!
3//! This module provides a deterministic simulation testing framework for testing Hydro/DFIR
4//! processes.
5//!
6//! It can be used to test complex interactions between multiple Hydro/DFIR processes in a
7//! deterministic manner by running them in a single-threaded environment. The framework also
8//! provides a "virtual network" implementation that allows production processes to exchange
9//! messages within the simulation. More importantly, the network is fully under control of the
10//! unit test and the test can introduce faults such as message delays, message drops and
11//! network partitions.
12//!
13//! ## Overview
14//!
15//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
16//! aren't real hosts, but rather a collection of individual Hydro/DFIR processes (one per host)
17//! that can communicate with each other over a virtual network. Every host has a "hostname"
18//! which uniquely identifies it within the fleet.
19//!
20//! ```text
21//!  ┌───────────────────────────────────────────────────────────────────────────────────────────┐
22//!  │SIMULATION                                                                                 │
23//!  │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
24//!  │ │FLEET                                                                                  │ │
25//!  │ │ ┌───────────────────────────────┐                   ┌───────────────────────────────┐ │ │
26//!  │ │ │HOST                           │                   │HOST                           │ │ │
27//!  │ │ │ ┌──────┐   ┌──────┐  ┌──────┐ │                   │ ┌──────┐   ┌──────┐  ┌──────┐ │ │ │
28//!  │ │ │ │INBOX │   │INBOX │  │INBOX │ │                 ┌-┼-►INBOX │   │INBOX │  │INBOX │ │ │ │
29//!  │ │ │ └──┬───┘   └──┬───┘  └──┬───┘ │                 │ │ └──┬───┘   └──┬───┘  └──┬───┘ │ │ │
30//!  │ │ │ ┌──▼──────────▼─────────▼───┐ │                 │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
31//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
32//!  │ │ │ │        TRANSDUCER         │ │                 │ │ │        TRANSDUCER         │ │ │ │
33//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
34//!  │ │ │ └───┬─────────┬──────────┬──┘ │                 │ │ └───┬─────────┬─────────┬───┘ │ │ │
35//!  │ │ │  ┌──▼───┐  ┌──▼───┐  ┌───▼──┐ │                 │ │  ┌──▼───┐  ┌──▼───┐  ┌──▼───┐ │ │ │
36//!  │ │ │  │OUTBOX│  │OUTBOX│  │OUTBOX┼-┼--┐              │ │  │OUTBOX│  │OUTBOX│  │OUTBOX│ │ │ │
37//!  │ │ │  └──────┘  └──────┘  └──────┘ │  │              │ │  └──────┘  └──────┘  └──────┘ │ │ │
38//!  │ │ └───────────────────────────────┘  │              │ └───────────────────────────────┘ │ │
39//!  │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
40//!  │                                    ┌─┼──────────────┼─┐                                   │
41//!  │                                    │ └--------------┘ │                                   │
42//!  │                                    │ NETWORK MESSAGE  │                                   │
43//!  │                                    │    PROCESSING    │                                   │
44//!  │                                    └──────────────────┘                                   │
45//!  └───────────────────────────────────────────────────────────────────────────────────────────┘
46//! ```
47//! ## Network Processing
48//!
49//! ### Outboxes & Inboxes
50//! When a process wishes to send a message to another process, it sends the message to an
51//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
52//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
53//! corresponding inboxes on the destination hosts. The network message processing logic is the
54//! point at which failures can be injected to change the behavior of the network.
55//!
56//! ### Interface Names
57//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
58//! identifies the interface on the host. When a process sends a message, it specifies the
59//! destination hostname and the interface name on that host to which the message should be
60//! delivered.
61//!
62//! ## Progress of Time in the Simulation
63//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
64//! method on the host. This ultimately runs a single tick on the process. The unit test is
65//! also responsible for invoking the network message processing at the time of its choosing and
66//! can interleave the progress of time on various hosts and network processing as it sees fit.
67//!
68//! ## Examples
69//! Check the tests module for examples on how to use the simulation framework.
70use std::any::Any;
71use std::collections::HashMap;
72use std::fmt::Debug;
73use std::pin::Pin;
74
75use futures::{Sink, StreamExt};
76use serde::{Deserialize, Serialize};
77use tokio::sync::mpsc::UnboundedSender;
78use tokio_stream::Stream;
79use tokio_stream::wrappers::UnboundedReceiverStream;
80use tracing::trace;
81
82use crate::scheduled::graph::Dfir;
83use crate::util::{collect_ready_async, unbounded_channel};
84
85/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
86/// to a specific host (and thus a specific Hydro/DFIR process).
87pub type Hostname = String;
88
89/// An interface name is a unique identifier for an inbox or an outbox on host.
90type InterfaceName = String;
91
92/// An address is a combination of a hostname and an interface name.
93#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
94pub struct Address {
95    host: Hostname,
96    interface: InterfaceName,
97}
98
99impl Address {
100    /// Create a new address with the given hostname and interface name.
101    pub fn new(host: Hostname, interface: InterfaceName) -> Self {
102        Address { host, interface }
103    }
104}
105
106/// A message sender is used to send messages to an inbox on a host.
107pub trait MessageSender {
108    /// Send a message to the inbox on the host.
109    fn send(&self, message: MessageWithAddress);
110}
111
112impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
113    fn send(&self, message: (Box<dyn Any>, Address)) {
114        match message.0.downcast::<T>() {
115            Ok(msg) => {
116                self.send((*msg, message.1)).unwrap();
117            }
118            Err(e) => {
119                panic!("Failed to downcast message to expected type: {:?}", e);
120            }
121        }
122    }
123}
124
125/// A message with an delivery address.
126pub type MessageWithAddress = (Box<dyn Any>, Address);
127
128/// An inbox is used by a host to receive messages for the process.
129pub struct Inbox {
130    sender: Box<dyn MessageSender>,
131}
132
133/// Processes can send messages to other processes by putting those messages in an outbox
134/// on their host.
135pub struct Outbox {
136    receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
137}
138
139/// A host is a single Hydro/DFIR process running in the simulation. It has a unique hostname
140/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
141/// and outboxes.
142pub struct Host {
143    name: Hostname,
144    process: Dfir<'static>,
145    inputs: HashMap<InterfaceName, Inbox>,
146    output: HashMap<InterfaceName, Outbox>,
147}
148
149impl Host {
150    /// Run a single tick on the host's process. Returns true if any work was done by the
151    /// process. This effectively "advances" time on the process.
152    pub async fn run_tick(&mut self) -> bool {
153        self.process.run_tick().await
154    }
155}
156
157/// A builder for constructing a host in the simulation.
158pub struct HostBuilder {
159    name: Hostname,
160    process: Option<Dfir<'static>>,
161    inboxes: HashMap<InterfaceName, Inbox>,
162    outboxes: HashMap<InterfaceName, Outbox>,
163}
164
165/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
166pub struct ProcessBuilderContext<'context> {
167    inboxes: &'context mut HashMap<InterfaceName, Inbox>,
168    outboxes: &'context mut HashMap<InterfaceName, Outbox>,
169}
170
171fn sink_from_fn<T>(f: impl FnMut(T)) -> impl Sink<T, Error = crate::Never> {
172    sinktools::for_each(f)
173}
174
175impl ProcessBuilderContext<'_> {
176    /// Create a new inbox on the host with the given interface name. Returns a stream that can
177    /// be read by the process using the source_stream dfir operator.
178    pub fn new_inbox<T: 'static>(
179        &mut self,
180        interface: InterfaceName,
181    ) -> UnboundedReceiverStream<(T, Address)> {
182        let (sender, receiver) = unbounded_channel::<(T, Address)>();
183        self.inboxes.insert(
184            interface,
185            Inbox {
186                sender: Box::new(sender),
187            },
188        );
189        receiver
190    }
191
192    /// Creates a new outbox on the host with the given interface name. Returns a sink that can
193    /// be written to by the process using the dest_sink dfir operator.
194    pub fn new_outbox<T: 'static>(
195        &mut self,
196        interface: InterfaceName,
197    ) -> impl use<T> + Sink<(T, Address), Error = crate::Never> {
198        let (sender, receiver) = unbounded_channel::<(T, Address)>();
199
200        let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
201
202        self.outboxes.insert(
203            interface,
204            Outbox {
205                receiver: Box::pin(receiver),
206            },
207        );
208
209        sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
210    }
211}
212
213impl HostBuilder {
214    /// Creates a new instance of HostBuilder for a given hostname,
215    pub fn new(name: Hostname) -> Self {
216        HostBuilder {
217            name,
218            process: None,
219            inboxes: Default::default(),
220            outboxes: Default::default(),
221        }
222    }
223
224    /// Supplies the (mandatory) process that runs on this host.
225    pub fn with_process<F>(mut self, builder: F) -> Self
226    where
227        F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
228    {
229        let mut context = ProcessBuilderContext {
230            inboxes: &mut self.inboxes,
231            outboxes: &mut self.outboxes,
232        };
233        let process = builder(&mut context);
234        self.process = Some(process);
235        self
236    }
237
238    /// Builds the host with the supplied configuration.
239    pub fn build(self) -> Host {
240        if self.process.is_none() {
241            panic!("Process is required to build a host");
242        }
243
244        Host {
245            name: self.name,
246            process: self.process.unwrap(),
247            inputs: self.inboxes,
248            output: self.outboxes,
249        }
250    }
251}
252
253/// A fleet is a collection of hosts in the simulation. It is responsible for running the
254/// simulation and processing network messages.
255pub struct Fleet {
256    hosts: HashMap<String, Host>,
257}
258
259impl Fleet {
260    /// Creates a new instance of Fleet.
261    pub fn new() -> Self {
262        Fleet {
263            hosts: HashMap::new(),
264        }
265    }
266
267    /// Adds a new host to the fleet with the given name and process.
268    pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
269    where
270        F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
271    {
272        let host = HostBuilder::new(name.clone())
273            .with_process(process_builder)
274            .build();
275        assert!(
276            self.hosts.insert(host.name.clone(), host).is_none(),
277            "Host with name {} already exists",
278            name
279        );
280        self.get_host(&name).unwrap()
281    }
282
283    /// Get a host by name.
284    pub fn get_host(&self, name: &str) -> Option<&Host> {
285        self.hosts.get(name)
286    }
287
288    /// Get a host by name.
289    pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
290        self.hosts.get_mut(name)
291    }
292
293    /// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
294    /// hosts. After ticking once on all the hosts, the method also processes network messages.
295    ///
296    /// The order in which the ticks are processed is not guaranteed.
297    pub async fn run_single_tick_all_hosts(&mut self) -> bool {
298        let mut work_done: bool = false;
299
300        for (name, host) in self.hosts.iter_mut() {
301            trace!("Running tick for host: {}", name);
302            work_done |= host.run_tick().await;
303        }
304
305        self.process_network().await;
306
307        work_done
308    }
309
310    /// Process all network messages in the simulation. This method picks up all messages from all
311    /// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
312    ///
313    /// The order in which the messages are processed is not guaranteed.
314    pub async fn process_network(&mut self) {
315        let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
316
317        // Collect all messages from all outboxes on all hosts.
318        for (name, host) in self.hosts.iter_mut() {
319            for (interface, output) in host.output.iter_mut() {
320                let src_address = Address::new(name.clone(), interface.clone());
321                let all_messages_on_interface: Vec<_> =
322                    collect_ready_async(&mut output.receiver).await;
323                for message_on_interface in all_messages_on_interface {
324                    all_messages.push((src_address.clone(), message_on_interface));
325                }
326            }
327        }
328
329        // Deliver all messages to the corresponding inboxes on the destination hosts.
330        for (src_address, (msg, addr)) in all_messages {
331            if let Some(destination_host) = self.hosts.get(&addr.host) {
332                if let Some(input) = destination_host.inputs.get(&addr.interface) {
333                    input.sender.send((msg, src_address.clone()));
334                } else {
335                    trace!(
336                        "No interface named {:?} found on host {:?}. Dropping message {:?}.",
337                        addr.interface, addr.host, msg
338                    );
339                }
340            } else {
341                trace!(
342                    "No host named {:?} found. Dropping message {:?}.",
343                    addr.host, msg
344                );
345            }
346        }
347    }
348
349    /// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
350    /// is done in "rounds". At each round, all hosts are ticked once and then network messages are
351    /// processed. The process continues until no work is done by any host in a round.
352    pub async fn run_until_quiescent(&mut self) {
353        while self.run_single_tick_all_hosts().await {}
354    }
355}
356
357impl Default for Fleet {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use dfir_macro::{dfir_syntax, dfir_test};
366    use futures::StreamExt;
367
368    use crate::util::simulation::{Address, Fleet, Hostname};
369    use crate::util::unbounded_channel;
370
371    /// A simple test to demonstrate use of the simulation framework. Implements an echo server
372    /// and client.
373    #[dfir_test]
374    async fn test_echo() {
375        let mut fleet = Fleet::new();
376
377        // Hostnames for the server and client
378        let server: Hostname = "server".to_string();
379        let client: Hostname = "client".to_string();
380
381        // Interface name for the echo "protocol"
382        let interface: String = "echo".to_string();
383
384        let server_address = Address::new(server.clone(), interface.clone());
385
386        // Create the echo server
387        fleet.add_host(server.clone(), |ctx| {
388            let network_input = ctx.new_inbox::<String>(interface.clone());
389            let network_output = ctx.new_outbox::<String>(interface.clone());
390            dfir_syntax! {
391                out = dest_sink(network_output);
392
393                source_stream(network_input)
394                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
395                    -> out;
396            }
397        });
398
399        // The client trigger channel is used to trigger the client into sending a message to the
400        // server. This allows the unit test to control when the client sends a message.
401        let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
402        let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
403
404        fleet.add_host(client.clone(), |ctx| {
405            let network_out = ctx.new_outbox::<String>(interface.clone());
406            let network_in = ctx.new_inbox::<String>(interface.clone());
407
408            dfir_syntax! {
409                out = dest_sink(network_out);
410
411                source_stream(client_trigger_rx)
412                    -> map(|msg| (msg, server_address.clone()))
413                    -> out;
414
415                source_stream(network_in)
416                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
417                    -> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
418
419            }
420        });
421
422        // Trigger the client to send a message.
423        client_trigger_tx.send("Hello, world!".to_string()).unwrap();
424
425        // Run the simulation until no new work is done by any host.
426        fleet.run_until_quiescent().await;
427
428        // Check that the message was received.
429        let response = client_response_rx.next().await.unwrap();
430        assert_eq!(response, "Hello, world!");
431    }
432}