gossip_cli/
main.rs

1use std::net::SocketAddr;
2
3use clap::{CommandFactory, Parser, Subcommand};
4use dfir_rs::util::{bind_udp_bytes, ipv4_resolve};
5use dfir_rs::{DemuxEnum, dfir_syntax, tokio};
6use gossip_kv::{ClientRequest, ClientResponse, Key};
7use tracing::error;
8
9/// CLI program to interact with Layer 0 gossip store.
10#[derive(Debug, Parser)]
11struct Opts {
12    #[clap(short, long, help = "Server address to connect to.")]
13    server_address: Option<String>,
14}
15
16/// Dummy app for using clap to process commands for interactive CLI.
17#[derive(Debug, Parser)]
18#[command(multicall = true)]
19struct InteractiveApp {
20    #[clap(subcommand)]
21    commands: InteractiveCommands,
22}
23
24#[derive(Debug, Subcommand, DemuxEnum)]
25enum InteractiveCommands {
26    /// Get a value from the store.
27    Get {
28        #[arg(value_parser = parse_key, required = true, help = "Key to get")]
29        key: Key,
30    },
31    /// Upsert a value in the store.
32    Set {
33        #[arg(value_parser = parse_key, required = true, help = "Key to set")]
34        key: Key,
35        value: String,
36    },
37    /// Delete a value from the store.
38    Delete {
39        #[arg(value_parser = parse_key, required = true, help = "Key to delete")]
40        key: Key,
41    },
42    /// Exit the application.
43    Exit,
44}
45
46/// Allows clap to parse Keys from user input.
47fn parse_key(s: &str) -> Result<Key, String> {
48    s.parse::<Key>().map_err(|e| e.to_string())
49}
50
51/// Parse a command from a line of input.
52fn parse_command(line: String) -> Option<InteractiveCommands> {
53    // Override how help is handled.
54    if line.trim() == "help" {
55        InteractiveApp::command()
56            .help_template("\nAvailable Commands: \n{subcommands}")
57            .print_help()
58            .unwrap();
59        return None;
60    }
61
62    // Split quoted string into parts.
63    let line_parts = shlex::split(&line);
64
65    if line_parts.is_none() {
66        error!("\nUnable to parse command.");
67        return None;
68    }
69
70    // Provide split parts to clap to process.
71    let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap());
72
73    match maybe_parsed {
74        Err(e) => {
75            // Problem with the parsed result. This displays some help.
76            error!("\n{}", e);
77            None
78        }
79        Ok(cli) => Some(cli.commands),
80    }
81}
82
83#[dfir_rs::main]
84async fn main() {
85    tracing_subscriber::fmt::init();
86
87    let opts = Opts::parse();
88
89    // Bind to OS-assigned port on localhost.
90    let address = ipv4_resolve("0.0.0.0:0").unwrap();
91
92    // Default to localhost:3000 if not provided.
93    let server_address = opts.server_address.map_or_else(
94        || ipv4_resolve("localhost:3001").unwrap(),
95        |s| ipv4_resolve(&s).unwrap(),
96    );
97
98    // Setup UDP sockets for communication.
99    let (outbound, inbound, _) = bind_udp_bytes(address).await;
100
101    let mut cli = dfir_syntax! {
102        inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response));
103
104        outbound_messages = union() -> dest_sink_serde(outbound);
105
106        // Parse commands from stdin.
107        commands = source_stdin()
108            -> filter_map(|line| parse_command(line.unwrap()))
109            -> demux_enum::<InteractiveCommands>();
110
111        commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages;
112        commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages;
113        commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages;
114        commands[Exit] -> for_each(|()| std::process::exit(0)); // TODO: Graceful shutdown https://github.com/hydro-project/hydro/issues/1253
115
116    };
117
118    cli.run_async().await;
119}