gossip_cli/
main.rs
1use std::net::SocketAddr;
2
3use clap::{CommandFactory, Parser, Subcommand};
4use dfir_rs::util::{bind_udp_bytes, ipv4_resolve};
5use dfir_rs::{DemuxEnum, dfir_syntax, tokio};
6use gossip_kv::{ClientRequest, ClientResponse, Key};
7use tracing::error;
8
9#[derive(Debug, Parser)]
11struct Opts {
12 #[clap(short, long, help = "Server address to connect to.")]
13 server_address: Option<String>,
14}
15
16#[derive(Debug, Parser)]
18#[command(multicall = true)]
19struct InteractiveApp {
20 #[clap(subcommand)]
21 commands: InteractiveCommands,
22}
23
24#[derive(Debug, Subcommand, DemuxEnum)]
25enum InteractiveCommands {
26 Get {
28 #[arg(value_parser = parse_key, required = true, help = "Key to get")]
29 key: Key,
30 },
31 Set {
33 #[arg(value_parser = parse_key, required = true, help = "Key to set")]
34 key: Key,
35 value: String,
36 },
37 Delete {
39 #[arg(value_parser = parse_key, required = true, help = "Key to delete")]
40 key: Key,
41 },
42 Exit,
44}
45
46fn parse_key(s: &str) -> Result<Key, String> {
48 s.parse::<Key>().map_err(|e| e.to_string())
49}
50
51fn parse_command(line: String) -> Option<InteractiveCommands> {
53 if line.trim() == "help" {
55 InteractiveApp::command()
56 .help_template("\nAvailable Commands: \n{subcommands}")
57 .print_help()
58 .unwrap();
59 return None;
60 }
61
62 let line_parts = shlex::split(&line);
64
65 if line_parts.is_none() {
66 error!("\nUnable to parse command.");
67 return None;
68 }
69
70 let maybe_parsed = InteractiveApp::try_parse_from(line_parts.unwrap());
72
73 match maybe_parsed {
74 Err(e) => {
75 error!("\n{}", e);
77 None
78 }
79 Ok(cli) => Some(cli.commands),
80 }
81}
82
83#[dfir_rs::main]
84async fn main() {
85 tracing_subscriber::fmt::init();
86
87 let opts = Opts::parse();
88
89 let address = ipv4_resolve("0.0.0.0:0").unwrap();
91
92 let server_address = opts.server_address.map_or_else(
94 || ipv4_resolve("localhost:3001").unwrap(),
95 |s| ipv4_resolve(&s).unwrap(),
96 );
97
98 let (outbound, inbound, _) = bind_udp_bytes(address).await;
100
101 let mut cli = dfir_syntax! {
102 inbound_messages = source_stream_serde(inbound) -> map(Result::unwrap) -> for_each(|(response, _addr): (ClientResponse, SocketAddr)| println!("{:?}", response));
103
104 outbound_messages = union() -> dest_sink_serde(outbound);
105
106 commands = source_stdin()
108 -> filter_map(|line| parse_command(line.unwrap()))
109 -> demux_enum::<InteractiveCommands>();
110
111 commands[Get] -> map(|(key,)| (ClientRequest::Get {key}, server_address)) -> outbound_messages;
112 commands[Set] -> map(|(key, value)| (ClientRequest::Set {key, value}, server_address)) -> outbound_messages;
113 commands[Delete] -> map(|(key,)| (ClientRequest::Delete {key}, server_address)) -> outbound_messages;
114 commands[Exit] -> for_each(|()| std::process::exit(0)); };
117
118 cli.run_async().await;
119}