gossip_server/config/
mod.rs

1use std::path::PathBuf;
2
3use config::{Config, ConfigError, File};
4use dfir_rs::futures::future::ready;
5use dfir_rs::futures::{Stream, StreamExt};
6use notify::{Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc::UnboundedSender;
9use tracing::trace;
10
11/// L0 Data Store settings.
12#[derive(Debug, Deserialize, Serialize)]
13pub struct ServerSettings {
14    /// An initial set of "seed nodes" that can be used to bootstrap the gossip cluster. These
15    /// won't be the only nodes in the cluster, but they can be used to discover other nodes.
16    pub seed_nodes: Vec<SeedNodeSettings>,
17}
18
19const CONFIG_ROOT: &str = "config";
20const STATIC_CONFIG_PATH: &str = "static";
21const DYNAMIC_CONFIG_PATH: &str = "dynamic";
22
23fn static_config_path(subpath: &str) -> PathBuf {
24    PathBuf::from(CONFIG_ROOT)
25        .join(STATIC_CONFIG_PATH)
26        .join(subpath)
27}
28
29fn dynamic_config_path(subpath: &str) -> PathBuf {
30    PathBuf::from(CONFIG_ROOT)
31        .join(DYNAMIC_CONFIG_PATH)
32        .join(subpath)
33}
34
35impl ServerSettings {
36    /// Load the settings from the configuration files.
37    pub fn new() -> Result<Self, ConfigError> {
38        let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
39
40        let settings = Config::builder()
41            /* Load the default settings from the `config/default.toml` file. */
42            .add_source(File::from(static_config_path("default.toml")).required(false))
43
44            /* Load additional overrides based on context (alpha, beta, production, etc.), if they exist. */
45            .add_source(File::from(static_config_path(&run_mode)).required(false))
46
47            /* Load the local settings, if they exist. These are .gitignored to prevent accidental
48               check-in. */
49            .add_source(File::from(static_config_path("local")).required(false))
50
51            /* Load the dynamic settings, if they exist. These always override any static configuration*/
52            .add_source(File::from(dynamic_config_path("dynamic.toml")).required(false))
53            .build()?;
54
55        settings.try_deserialize()
56    }
57}
58
59/// A seed node that can be used to bootstrap the gossip cluster.
60#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)]
61pub struct SeedNodeSettings {
62    /// The ID of the seed node.
63    pub id: String,
64
65    /// The address on which the seed node is listening for gossip messages.
66    pub address: String,
67}
68
69/// Setup a watcher for the settings files and return a stream of settings changes.
70///
71/// Returns the watcher, the initial settings, and a stream of settings changes. The watcher is
72/// returned so that it can be kept alive for the lifetime of the application. Also returns a
73/// snapshot of the current settings.
74pub fn setup_settings_watch() -> (
75    RecommendedWatcher,
76    ServerSettings,
77    impl Stream<Item = ServerSettings>,
78) {
79    let (tx, rx) = dfir_rs::util::unbounded_channel();
80
81    // Setup the watcher
82    let mut watcher = notify::RecommendedWatcher::new(
83        UnboundedSenderEventHandler::new(tx),
84        notify::Config::default(),
85    )
86    .unwrap();
87    watcher
88        .watch(&PathBuf::from(CONFIG_ROOT), RecursiveMode::Recursive)
89        .unwrap();
90
91    // Read initial settings
92    let initial_settings = ServerSettings::new().unwrap();
93
94    let change_stream = rx
95        .map(Result::unwrap)
96        .map(|event| {
97            trace!("Event: {:?}", event);
98            match event.kind {
99                EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => {
100                    Some(ServerSettings::new().unwrap())
101                }
102                _ => {
103                    trace!("Unhandled event: {:?}", event);
104                    None
105                }
106            }
107        })
108        .filter_map(ready);
109
110    // If the watcher is dropped, the stream will stop producing events. So, returning the watcher.
111    (watcher, initial_settings, change_stream)
112}
113
114/// Wraps an UnboundedSender to implement the notify::EventHandler trait. This allows sending
115/// file notification evnts to UnboundedSender instances.
116struct UnboundedSenderEventHandler {
117    tx: UnboundedSender<notify::Result<Event>>,
118}
119
120impl UnboundedSenderEventHandler {
121    fn new(tx: UnboundedSender<notify::Result<Event>>) -> Self {
122        Self { tx }
123    }
124}
125
126impl EventHandler for UnboundedSenderEventHandler {
127    fn handle_event(&mut self, event: notify::Result<Event>) {
128        self.tx.send(event).unwrap();
129    }
130}