1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
//! # Hydroflow Deterministic Simulation Testing Framework
//!
//! This module provides a deterministic simulation testing framework for testing Hydroflow
//! transducers.
//!
//! It can be used to test complex interactions between multiple Hydroflow transducers in a
//! deterministic manner by running them in a single-threaded environment. The framework also
//! provides a "virtual network" implementation that allows production transducers to exchange
//! messages within the simulation. More importantly, the network is fully under control of the
//! unit test and the test can introduce faults such as message delays, message drops and
//! network partitions.
//!
//! ## Overview
//!
//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
//! aren't real hosts, but rather a collection of individual Hydroflow transducers (one per host)
//! that can communicate with each other over a virtual network. Every host has a "hostname"
//! which uniquely identifies it within the fleet.
//!
//! ```text
//!  ┌───────────────────────────────────────────────────────────────────────────────────────────┐
//!  │SIMULATION                                                                                 │
//!  │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
//!  │ │FLEET                                                                                  │ │
//!  │ │ ┌───────────────────────────────┐                   ┌───────────────────────────────┐ │ │
//!  │ │ │HOST                           │                   │HOST                           │ │ │
//!  │ │ │ ┌──────┐   ┌──────┐  ┌──────┐ │                   │ ┌──────┐   ┌──────┐  ┌──────┐ │ │ │
//!  │ │ │ │INBOX │   │INBOX │  │INBOX │ │                 ┌-┼-►INBOX │   │INBOX │  │INBOX │ │ │ │
//!  │ │ │ └──┬───┘   └──┬───┘  └──┬───┘ │                 │ │ └──┬───┘   └──┬───┘  └──┬───┘ │ │ │
//!  │ │ │ ┌──▼──────────▼─────────▼───┐ │                 │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
//!  │ │ │ │        TRANSDUCER         │ │                 │ │ │        TRANSDUCER         │ │ │ │
//!  │ │ │ │                           │ │                 │ │ │                           │ │ │ │
//!  │ │ │ └───┬─────────┬──────────┬──┘ │                 │ │ └───┬─────────┬─────────┬───┘ │ │ │
//!  │ │ │  ┌──▼───┐  ┌──▼───┐  ┌───▼──┐ │                 │ │  ┌──▼───┐  ┌──▼───┐  ┌──▼───┐ │ │ │
//!  │ │ │  │OUTBOX│  │OUTBOX│  │OUTBOX┼-┼--┐              │ │  │OUTBOX│  │OUTBOX│  │OUTBOX│ │ │ │
//!  │ │ │  └──────┘  └──────┘  └──────┘ │  │              │ │  └──────┘  └──────┘  └──────┘ │ │ │
//!  │ │ └───────────────────────────────┘  │              │ └───────────────────────────────┘ │ │
//!  │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
//!  │                                    ┌─┼──────────────┼─┐                                   │
//!  │                                    │ └--------------┘ │                                   │
//!  │                                    │ NETWORK MESSAGE  │                                   │
//!  │                                    │    PROCESSING    │                                   │
//!  │                                    └──────────────────┘                                   │
//!  └───────────────────────────────────────────────────────────────────────────────────────────┘
//! ```
//! ## Network Processing
//!
//! ### Outboxes & Inboxes
//! When a transducer wishes to send a message to another transducer, it sends the message to an
//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
//! corresponding inboxes on the destination hosts. The network message processing logic is the
//! point at which failures can be injected to change the behavior of the network.
//!
//! ### Interface Names
//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
//! identifies the interface on the host. When a transducer sends a message, it specifies the
//! destination hostname and the interface name on that host to which the message should be
//! delivered.
//!
//! ## Progress of Time in the Simulation
//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
//! method on the host. This ultimately runs a single tick on the transducer. The unit test is
//! also responsible for invoking the network message processing at the time of its choosing and
//! can interleave the progress of time on various hosts and network processing as it sees fit.
//!
//! ## Examples
//! Check the tests module for examples on how to use the simulation framework.
use std::any::Any;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::ready;
use std::pin::Pin;

use futures::{sink, Sink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::Stream;
use tracing::trace;

use crate::scheduled::graph::Dfir;
use crate::util::{collect_ready_async, unbounded_channel};

/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
/// to a specific host (and thus a specific Hydroflow transducer).
pub type Hostname = String;

/// An interface name is a unique identifier for an inbox or an outbox on host.
type InterfaceName = String;

/// An address is a combination of a hostname and an interface name.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Address {
    host: Hostname,
    interface: InterfaceName,
}

impl Address {
    /// Create a new address with the given hostname and interface name.
    pub fn new(host: Hostname, interface: InterfaceName) -> Self {
        Address { host, interface }
    }
}

/// A message sender is used to send messages to an inbox on a host.
pub trait MessageSender {
    /// Send a message to the inbox on the host.
    fn send(&self, message: MessageWithAddress);
}

impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
    fn send(&self, message: (Box<dyn Any>, Address)) {
        match message.0.downcast::<T>() {
            Ok(msg) => {
                self.send((*msg, message.1)).unwrap();
            }
            Err(e) => {
                panic!("Failed to downcast message to expected type: {:?}", e);
            }
        }
    }
}

/// A message with an delivery address.
pub type MessageWithAddress = (Box<dyn Any>, Address);

/// An inbox is used by a host to receive messages for the transducer.
pub struct Inbox {
    sender: Box<dyn MessageSender>,
}

/// Transducers can send messages to other transducers by putting those messages in an outbox
/// on their host.
pub struct Outbox {
    receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
}

/// A host is a single Hydroflow transducer running in the simulation. It has a unique hostname
/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
/// and outboxes.
pub struct Host {
    name: Hostname,
    transducer: Dfir<'static>,
    inputs: HashMap<InterfaceName, Inbox>,
    output: HashMap<InterfaceName, Outbox>,
}

impl Host {
    /// Run a single tick on the host's transducer. Returns true if any work was done by the
    /// transducer. This effectively "advances" time on the transducer.
    pub fn run_tick(&mut self) -> bool {
        self.transducer.run_tick()
    }
}

/// A builder for constructing a host in the simulation.
pub struct HostBuilder {
    name: Hostname,
    transducer: Option<Dfir<'static>>,
    inboxes: HashMap<InterfaceName, Inbox>,
    outboxes: HashMap<InterfaceName, Outbox>,
}

/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
pub struct TransducerBuilderContext<'context> {
    inboxes: &'context mut HashMap<InterfaceName, Inbox>,
    outboxes: &'context mut HashMap<InterfaceName, Outbox>,
}

fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
    sink::drain().with(move |item| {
        (f)(item);
        ready(Result::<(), Infallible>::Ok(()))
    })
}

impl TransducerBuilderContext<'_> {
    /// Create a new inbox on the host with the given interface name. Returns a stream that can
    /// be read by the transducer using the source_stream dfir operator.
    pub fn new_inbox<T: 'static>(
        &mut self,
        interface: InterfaceName,
    ) -> UnboundedReceiverStream<(T, Address)> {
        let (sender, receiver) = unbounded_channel::<(T, Address)>();
        self.inboxes.insert(
            interface,
            Inbox {
                sender: Box::new(sender),
            },
        );
        receiver
    }

    /// Creates a new outbox on the host with the given interface name. Returns a sink that can
    /// be written to by the transducer using the dest_sink dfir operator.
    pub fn new_outbox<T: 'static>(
        &mut self,
        interface: InterfaceName,
    ) -> impl Sink<(T, Address), Error = Infallible> {
        let (sender, receiver) = unbounded_channel::<(T, Address)>();

        let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));

        self.outboxes.insert(
            interface,
            Outbox {
                receiver: Box::pin(receiver),
            },
        );

        sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
    }
}

impl HostBuilder {
    /// Creates a new instance of HostBuilder for a given hostname,
    pub fn new(name: Hostname) -> Self {
        HostBuilder {
            name,
            transducer: None,
            inboxes: Default::default(),
            outboxes: Default::default(),
        }
    }

    /// Supplies the (mandatory) transducer that runs on this host.
    pub fn with_transducer<F>(mut self, builder: F) -> Self
    where
        F: FnOnce(&mut TransducerBuilderContext) -> Dfir<'static>,
    {
        let mut context = TransducerBuilderContext {
            inboxes: &mut self.inboxes,
            outboxes: &mut self.outboxes,
        };
        let transducer = builder(&mut context);
        self.transducer = Some(transducer);
        self
    }

    /// Builds the host with the supplied configuration.
    pub fn build(self) -> Host {
        if self.transducer.is_none() {
            panic!("Transducer is required to build a host");
        }

        Host {
            name: self.name,
            transducer: self.transducer.unwrap(),
            inputs: self.inboxes,
            output: self.outboxes,
        }
    }
}

/// A fleet is a collection of hosts in the simulation. It is responsible for running the
/// simulation and processing network messages.
pub struct Fleet {
    hosts: HashMap<String, Host>,
}

impl Fleet {
    /// Creates a new instance of Fleet.
    pub fn new() -> Self {
        Fleet {
            hosts: HashMap::new(),
        }
    }

    /// Adds a new host to the fleet with the given name and transducer.
    pub fn add_host<F>(&mut self, name: String, transducer_builder: F) -> &Host
    where
        F: FnOnce(&mut TransducerBuilderContext) -> Dfir<'static>,
    {
        let host = HostBuilder::new(name.clone())
            .with_transducer(transducer_builder)
            .build();
        assert!(
            self.hosts.insert(host.name.clone(), host).is_none(),
            "Host with name {} already exists",
            name
        );
        self.get_host(&name).unwrap()
    }

    /// Get a host by name.
    pub fn get_host(&self, name: &str) -> Option<&Host> {
        self.hosts.get(name)
    }

    /// Get a host by name.
    pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
        self.hosts.get_mut(name)
    }

    /// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
    /// hosts. After ticking once on all the hosts, the method also processes network messages.
    ///
    /// The order in which the ticks are processed is not guaranteed.
    pub async fn run_single_tick_all_hosts(&mut self) -> bool {
        let mut work_done: bool = false;

        for (name, host) in self.hosts.iter_mut() {
            trace!("Running tick for host: {}", name);
            work_done |= host.run_tick();
        }

        self.process_network().await;

        work_done
    }

    /// Process all network messages in the simulation. This method picks up all messages from all
    /// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
    ///
    /// The order in which the messages are processed is not guaranteed.
    pub async fn process_network(&mut self) {
        let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();

        // Collect all messages from all outboxes on all hosts.
        for (name, host) in self.hosts.iter_mut() {
            for (interface, output) in host.output.iter_mut() {
                let src_address = Address::new(name.clone(), interface.clone());
                let all_messages_on_interface: Vec<_> =
                    collect_ready_async(&mut output.receiver).await;
                for message_on_interface in all_messages_on_interface {
                    all_messages.push((src_address.clone(), message_on_interface));
                }
            }
        }

        // Deliver all messages to the corresponding inboxes on the destination hosts.
        for (src_address, (msg, addr)) in all_messages {
            if let Some(destination_host) = self.hosts.get(&addr.host) {
                if let Some(input) = destination_host.inputs.get(&addr.interface) {
                    input.sender.send((msg, src_address.clone()));
                } else {
                    trace!(
                        "No interface named {:?} found on host {:?}. Dropping message {:?}.",
                        addr.interface,
                        addr.host,
                        msg
                    );
                }
            } else {
                trace!(
                    "No host named {:?} found. Dropping message {:?}.",
                    addr.host,
                    msg
                );
            }
        }
    }

    /// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
    /// is done in "rounds". At each round, all hosts are ticked once and then network messages are
    /// processed. The process continues until no work is done by any host in a round.
    pub async fn run_until_quiescent(&mut self) {
        while self.run_single_tick_all_hosts().await {}
    }
}

impl Default for Fleet {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use dfir_macro::{dfir_syntax, dfir_test};
    use futures::StreamExt;

    use crate::util::simulation::{Address, Fleet, Hostname};
    use crate::util::unbounded_channel;

    /// A simple test to demonstrate use of the simulation framework. Implements an echo server
    /// and client.
    #[dfir_test]
    async fn test_echo() {
        let mut fleet = Fleet::new();

        // Hostnames for the server and client
        let server: Hostname = "server".to_string();
        let client: Hostname = "client".to_string();

        // Interface name for the echo "protocol"
        let interface: String = "echo".to_string();

        let server_address = Address::new(server.clone(), interface.clone());

        // Create the echo server
        fleet.add_host(server.clone(), |ctx| {
            let network_input = ctx.new_inbox::<String>(interface.clone());
            let network_output = ctx.new_outbox::<String>(interface.clone());
            dfir_syntax! {
                out = dest_sink(network_output);

                source_stream(network_input)
                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
                    -> out;
            }
        });

        // The client trigger channel is used to trigger the client into sending a message to the
        // server. This allows the unit test to control when the client sends a message.
        let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
        let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();

        fleet.add_host(client.clone(), |ctx| {
            let network_out = ctx.new_outbox::<String>(interface.clone());
            let network_in = ctx.new_inbox::<String>(interface.clone());

            dfir_syntax! {
                out = dest_sink(network_out);

                source_stream(client_trigger_rx)
                    -> map(|msg| (msg, server_address.clone()))
                    -> out;

                source_stream(network_in)
                    -> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
                    -> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());

            }
        });

        // Trigger the client to send a message.
        client_trigger_tx.send("Hello, world!".to_string()).unwrap();

        // Run the simulation until no new work is done by any host.
        fleet.run_until_quiescent().await;

        // Check that the message was received.
        let response = client_response_rx.next().await.unwrap();
        assert_eq!(response, "Hello, world!");
    }
}