dfir_rs/util/
simulation.rs

1//! # Hydro/DFIR Deterministic Simulation Testing Framework
2//!
3//! This module provides a deterministic simulation testing framework for testing Hydro/DFIR
4//! processes.
5//!
6//! It can be used to test complex interactions between multiple Hydro/DFIR processes in a
7//! deterministic manner by running them in a single-threaded environment. The framework also
8//! provides a "virtual network" implementation that allows production processes to exchange
9//! messages within the simulation. More importantly, the network is fully under control of the
10//! unit test and the test can introduce faults such as message delays, message drops and
11//! network partitions.
12//!
13//! ## Overview
14//!
15//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
16//! aren't real hosts, but rather a collection of individual Hydro/DFIR processes (one per host)
17//! that can communicate with each other over a virtual network. Every host has a "hostname"
18//! which uniquely identifies it within the fleet.
19//!
20//! ```text
21//!  ┌───────────────────────────────────────────────────────────────────────────────────────────┐
22//!  │SIMULATION                                                                                 │
23//!  │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
24//!  │ │FLEET                                                                                  │ │
25//!  │ │ ┌───────────────────────────────┐                   ┌───────────────────────────────┐ │ │
26//!  │ │ │HOST                           │                   │HOST                           │ │ │
27//!  │ │ │ ┌──────┐   ┌──────┐  ┌──────┐ │                   │ ┌──────┐   ┌──────┐  ┌──────┐ │ │ │
28//!  │ │ │ │INBOX │   │INBOX │  │INBOX │ │                 ┌-┼-►INBOX │   │INBOX │  │INBOX │ │ │ │
29//!  │ │ │ └──┬───┘   └──┬───┘  └──┬───┘ │                 │ │ └──┬───┘   └──┬───┘  └──┬───┘ │ │ │
30//!  │ │ │ ┌──▼──────────▼─────────▼───┐ │                 │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
31//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
32//!  │ │ │ │        TRANSDUCER         │ │                 │ │ │        TRANSDUCER         │ │ │ │
33//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
34//!  │ │ │ └───┬─────────┬──────────┬──┘ │                 │ │ └───┬─────────┬─────────┬───┘ │ │ │
35//!  │ │ │  ┌──▼───┐  ┌──▼───┐  ┌───▼──┐ │                 │ │  ┌──▼───┐  ┌──▼───┐  ┌──▼───┐ │ │ │
36//!  │ │ │  │OUTBOX│  │OUTBOX│  │OUTBOX┼-┼--┐              │ │  │OUTBOX│  │OUTBOX│  │OUTBOX│ │ │ │
37//!  │ │ │  └──────┘  └──────┘  └──────┘ │  │              │ │  └──────┘  └──────┘  └──────┘ │ │ │
38//!  │ │ └───────────────────────────────┘  │              │ └───────────────────────────────┘ │ │
39//!  │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
40//!  │                                    ┌─┼──────────────┼─┐                                   │
41//!  │                                    │ └--------------┘ │                                   │
42//!  │                                    │ NETWORK MESSAGE  │                                   │
43//!  │                                    │    PROCESSING    │                                   │
44//!  │                                    └──────────────────┘                                   │
45//!  └───────────────────────────────────────────────────────────────────────────────────────────┘
46//! ```
47//! ## Network Processing
48//!
49//! ### Outboxes & Inboxes
50//! When a process wishes to send a message to another process, it sends the message to an
51//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
52//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
53//! corresponding inboxes on the destination hosts. The network message processing logic is the
54//! point at which failures can be injected to change the behavior of the network.
55//!
56//! ### Interface Names
57//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
58//! identifies the interface on the host. When a process sends a message, it specifies the
59//! destination hostname and the interface name on that host to which the message should be
60//! delivered.
61//!
62//! ## Progress of Time in the Simulation
63//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
64//! method on the host. This ultimately runs a single tick on the process. The unit test is
65//! also responsible for invoking the network message processing at the time of its choosing and
66//! can interleave the progress of time on various hosts and network processing as it sees fit.
67//!
68//! ## Examples
69//! Check the tests module for examples on how to use the simulation framework.
70use std::any::Any;
71use std::collections::HashMap;
72use std::convert::Infallible;
73use std::fmt::Debug;
74use std::future::ready;
75use std::pin::Pin;
76
77use futures::{Sink, SinkExt, StreamExt, sink};
78use serde::{Deserialize, Serialize};
79use tokio::sync::mpsc::UnboundedSender;
80use tokio_stream::Stream;
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::trace;
83
84use crate::scheduled::graph::Dfir;
85use crate::util::{collect_ready_async, unbounded_channel};
86
87/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
88/// to a specific host (and thus a specific Hydro/DFIR process).
89pub type Hostname = String;
90
91/// An interface name is a unique identifier for an inbox or an outbox on host.
92type InterfaceName = String;
93
94/// An address is a combination of a hostname and an interface name.
95#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
96pub struct Address {
97    host: Hostname,
98    interface: InterfaceName,
99}
100
101impl Address {
102    /// Create a new address with the given hostname and interface name.
103    pub fn new(host: Hostname, interface: InterfaceName) -> Self {
104        Address { host, interface }
105    }
106}
107
108/// A message sender is used to send messages to an inbox on a host.
109pub trait MessageSender {
110    /// Send a message to the inbox on the host.
111    fn send(&self, message: MessageWithAddress);
112}
113
114impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
115    fn send(&self, message: (Box<dyn Any>, Address)) {
116        match message.0.downcast::<T>() {
117            Ok(msg) => {
118                self.send((*msg, message.1)).unwrap();
119            }
120            Err(e) => {
121                panic!("Failed to downcast message to expected type: {:?}", e);
122            }
123        }
124    }
125}
126
127/// A message with an delivery address.
128pub type MessageWithAddress = (Box<dyn Any>, Address);
129
130/// An inbox is used by a host to receive messages for the process.
131pub struct Inbox {
132    sender: Box<dyn MessageSender>,
133}
134
135/// Processes can send messages to other processes by putting those messages in an outbox
136/// on their host.
137pub struct Outbox {
138    receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
139}
140
141/// A host is a single Hydro/DFIR process running in the simulation. It has a unique hostname
142/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
143/// and outboxes.
144pub struct Host {
145    name: Hostname,
146    process: Dfir<'static>,
147    inputs: HashMap<InterfaceName, Inbox>,
148    output: HashMap<InterfaceName, Outbox>,
149}
150
151impl Host {
152    /// Run a single tick on the host's process. Returns true if any work was done by the
153    /// process. This effectively "advances" time on the process.
154    pub fn run_tick(&mut self) -> bool {
155        self.process.run_tick()
156    }
157}
158
159/// A builder for constructing a host in the simulation.
160pub struct HostBuilder {
161    name: Hostname,
162    process: Option<Dfir<'static>>,
163    inboxes: HashMap<InterfaceName, Inbox>,
164    outboxes: HashMap<InterfaceName, Outbox>,
165}
166
167/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
168pub struct ProcessBuilderContext<'context> {
169    inboxes: &'context mut HashMap<InterfaceName, Inbox>,
170    outboxes: &'context mut HashMap<InterfaceName, Outbox>,
171}
172
173fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
174    sink::drain().with(move |item| {
175        (f)(item);
176        ready(Result::<(), Infallible>::Ok(()))
177    })
178}
179
180impl ProcessBuilderContext<'_> {
181    /// Create a new inbox on the host with the given interface name. Returns a stream that can
182    /// be read by the process using the source_stream dfir operator.
183    pub fn new_inbox<T: 'static>(
184        &mut self,
185        interface: InterfaceName,
186    ) -> UnboundedReceiverStream<(T, Address)> {
187        let (sender, receiver) = unbounded_channel::<(T, Address)>();
188        self.inboxes.insert(
189            interface,
190            Inbox {
191                sender: Box::new(sender),
192            },
193        );
194        receiver
195    }
196
197    /// Creates a new outbox on the host with the given interface name. Returns a sink that can
198    /// be written to by the process using the dest_sink dfir operator.
199    pub fn new_outbox<T: 'static>(
200        &mut self,
201        interface: InterfaceName,
202    ) -> impl use<T> + Sink<(T, Address), Error = Infallible> {
203        let (sender, receiver) = unbounded_channel::<(T, Address)>();
204
205        let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
206
207        self.outboxes.insert(
208            interface,
209            Outbox {
210                receiver: Box::pin(receiver),
211            },
212        );
213
214        sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
215    }
216}
217
218impl HostBuilder {
219    /// Creates a new instance of HostBuilder for a given hostname,
220    pub fn new(name: Hostname) -> Self {
221        HostBuilder {
222            name,
223            process: None,
224            inboxes: Default::default(),
225            outboxes: Default::default(),
226        }
227    }
228
229    /// Supplies the (mandatory) process that runs on this host.
230    pub fn with_process<F>(mut self, builder: F) -> Self
231    where
232        F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
233    {
234        let mut context = ProcessBuilderContext {
235            inboxes: &mut self.inboxes,
236            outboxes: &mut self.outboxes,
237        };
238        let process = builder(&mut context);
239        self.process = Some(process);
240        self
241    }
242
243    /// Builds the host with the supplied configuration.
244    pub fn build(self) -> Host {
245        if self.process.is_none() {
246            panic!("Process is required to build a host");
247        }
248
249        Host {
250            name: self.name,
251            process: self.process.unwrap(),
252            inputs: self.inboxes,
253            output: self.outboxes,
254        }
255    }
256}
257
258/// A fleet is a collection of hosts in the simulation. It is responsible for running the
259/// simulation and processing network messages.
260pub struct Fleet {
261    hosts: HashMap<String, Host>,
262}
263
264impl Fleet {
265    /// Creates a new instance of Fleet.
266    pub fn new() -> Self {
267        Fleet {
268            hosts: HashMap::new(),
269        }
270    }
271
272    /// Adds a new host to the fleet with the given name and process.
273    pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
274    where
275        F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
276    {
277        let host = HostBuilder::new(name.clone())
278            .with_process(process_builder)
279            .build();
280        assert!(
281            self.hosts.insert(host.name.clone(), host).is_none(),
282            "Host with name {} already exists",
283            name
284        );
285        self.get_host(&name).unwrap()
286    }
287
288    /// Get a host by name.
289    pub fn get_host(&self, name: &str) -> Option<&Host> {
290        self.hosts.get(name)
291    }
292
293    /// Get a host by name.
294    pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
295        self.hosts.get_mut(name)
296    }
297
298    /// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
299    /// hosts. After ticking once on all the hosts, the method also processes network messages.
300    ///
301    /// The order in which the ticks are processed is not guaranteed.
302    pub async fn run_single_tick_all_hosts(&mut self) -> bool {
303        let mut work_done: bool = false;
304
305        for (name, host) in self.hosts.iter_mut() {
306            trace!("Running tick for host: {}", name);
307            work_done |= host.run_tick();
308        }
309
310        self.process_network().await;
311
312        work_done
313    }
314
315    /// Process all network messages in the simulation. This method picks up all messages from all
316    /// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
317    ///
318    /// The order in which the messages are processed is not guaranteed.
319    pub async fn process_network(&mut self) {
320        let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
321
322        // Collect all messages from all outboxes on all hosts.
323        for (name, host) in self.hosts.iter_mut() {
324            for (interface, output) in host.output.iter_mut() {
325                let src_address = Address::new(name.clone(), interface.clone());
326                let all_messages_on_interface: Vec<_> =
327                    collect_ready_async(&mut output.receiver).await;
328                for message_on_interface in all_messages_on_interface {
329                    all_messages.push((src_address.clone(), message_on_interface));
330                }
331            }
332        }
333
334        // Deliver all messages to the corresponding inboxes on the destination hosts.
335        for (src_address, (msg, addr)) in all_messages {
336            if let Some(destination_host) = self.hosts.get(&addr.host) {
337                if let Some(input) = destination_host.inputs.get(&addr.interface) {
338                    input.sender.send((msg, src_address.clone()));
339                } else {
340                    trace!(
341                        "No interface named {:?} found on host {:?}. Dropping message {:?}.",
342                        addr.interface, addr.host, msg
343                    );
344                }
345            } else {
346                trace!(
347                    "No host named {:?} found. Dropping message {:?}.",
348                    addr.host, msg
349                );
350            }
351        }
352    }
353
354    /// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
355    /// is done in "rounds". At each round, all hosts are ticked once and then network messages are
356    /// processed. The process continues until no work is done by any host in a round.
357    pub async fn run_until_quiescent(&mut self) {
358        while self.run_single_tick_all_hosts().await {}
359    }
360}
361
362impl Default for Fleet {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use dfir_macro::{dfir_syntax, dfir_test};
371    use futures::StreamExt;
372
373    use crate::util::simulation::{Address, Fleet, Hostname};
374    use crate::util::unbounded_channel;
375
376    /// A simple test to demonstrate use of the simulation framework. Implements an echo server
377    /// and client.
378    #[dfir_test]
379    async fn test_echo() {
380        let mut fleet = Fleet::new();
381
382        // Hostnames for the server and client
383        let server: Hostname = "server".to_string();
384        let client: Hostname = "client".to_string();
385
386        // Interface name for the echo "protocol"
387        let interface: String = "echo".to_string();
388
389        let server_address = Address::new(server.clone(), interface.clone());
390
391        // Create the echo server
392        fleet.add_host(server.clone(), |ctx| {
393            let network_input = ctx.new_inbox::<String>(interface.clone());
394            let network_output = ctx.new_outbox::<String>(interface.clone());
395            dfir_syntax! {
396                out = dest_sink(network_output);
397
398                source_stream(network_input)
399                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
400                    -> out;
401            }
402        });
403
404        // The client trigger channel is used to trigger the client into sending a message to the
405        // server. This allows the unit test to control when the client sends a message.
406        let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
407        let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
408
409        fleet.add_host(client.clone(), |ctx| {
410            let network_out = ctx.new_outbox::<String>(interface.clone());
411            let network_in = ctx.new_inbox::<String>(interface.clone());
412
413            dfir_syntax! {
414                out = dest_sink(network_out);
415
416                source_stream(client_trigger_rx)
417                    -> map(|msg| (msg, server_address.clone()))
418                    -> out;
419
420                source_stream(network_in)
421                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
422                    -> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
423
424            }
425        });
426
427        // Trigger the client to send a message.
428        client_trigger_tx.send("Hello, world!".to_string()).unwrap();
429
430        // Run the simulation until no new work is done by any host.
431        fleet.run_until_quiescent().await;
432
433        // Check that the message was received.
434        let response = client_response_rx.next().await.unwrap();
435        assert_eq!(response, "Hello, world!");
436    }
437}