latency_measure/
latency_measure.rs1use std::collections::HashMap;
2use std::io::Write;
3use std::sync::atomic::{AtomicBool, AtomicU64};
4use std::sync::{Arc, mpsc};
5use std::thread;
6use std::time::Instant;
7
8use dfir_rs::bytes::Bytes;
9use dfir_rs::util::deploy::{ConnectedDirect, ConnectedSink, ConnectedSource};
10use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
11use futures::{SinkExt, StreamExt};
12pub use topolotree::protocol::*;
13
14#[tokio::main]
15async fn main() {
16 let ports = dfir_rs::util::deploy::init::<()>().await;
17 let mut start_node = ports
18 .port("increment_start_node")
19 .connect::<ConnectedDirect>()
20 .into_sink();
21
22 let mut end_node = ports
23 .port("end_node_query")
24 .connect::<ConnectedDirect>()
25 .into_source();
26
27 let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
28 let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
29 let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap();
30
31 let atomic_counter = Arc::new(AtomicU64::new(0));
32 let atomic_borrow = atomic_counter.clone();
33 let atomic_keep_running = Arc::new(AtomicBool::new(true));
34 let atomic_keep_running_clone = atomic_keep_running.clone();
35 let (latency_sender, latency_receiver) = mpsc::channel::<u128>();
36 let printer_thread = thread::spawn(move || {
37 let mut last_instant = Instant::now();
38 while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) {
39 thread::sleep(std::time::Duration::from_millis(100));
40 let now = Instant::now();
41 let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed);
42 let elapsed = now - last_instant;
43 last_instant = now;
44 println!("throughput,{},{}", counter, elapsed.as_secs_f64());
45
46 while let Ok(latency) = latency_receiver.try_recv() {
47 println!("latency,{}", latency);
48 }
49
50 std::io::stdout().flush().unwrap()
51 }
52 });
53
54 let (inc_sender, mut inc_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
55 tokio::spawn(async move {
56 loop {
57 let value = inc_receiver.recv().await.unwrap();
58 start_node.send(value).await.unwrap();
59 }
60 });
61
62 let mut queues = vec![];
63
64 for i in 0..num_clients {
65 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<i64>();
66 queues.push(sender);
67
68 let inc_sender = inc_sender.clone();
69 let latency_sender = latency_sender.clone();
70 let atomic_counter = atomic_counter.clone();
71 let keep_running = atomic_keep_running.clone();
72 tokio::spawn(async move {
73 #[cfg(debug_assertions)]
74 let mut count_tracker = HashMap::new();
75
76 let mut next_base: u64 = 0;
77
78 while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
79 let id = (partition_n * keys_per_partition)
80 + ((((next_base % keys_per_partition) / num_clients) * num_clients) + i);
81 next_base = next_base.wrapping_add(1);
82 let increment = rand::random::<bool>();
83 let change = if increment { 1 } else { -1 };
84 let start = Instant::now();
85 inc_sender
86 .send(serialize_to_bytes(OperationPayload { key: id, change }))
87 .unwrap();
88
89 let received = receiver.recv().await.unwrap();
90 #[cfg(debug_assertions)]
91 {
92 let count = count_tracker.entry(id).or_insert(0);
93 *count += change;
94 assert_eq!(*count, received);
95 }
96
97 if next_base.is_multiple_of(100) {
98 latency_sender.send(start.elapsed().as_micros()).unwrap();
99 }
100
101 atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
102 }
103 });
104 }
105
106 tokio::spawn(async move {
107 loop {
108 let updated =
109 deserialize_from_bytes::<QueryResponse>(end_node.next().await.unwrap().unwrap())
110 .unwrap();
111
112 if updated.key / keys_per_partition != partition_n {
113 continue;
114 }
115
116 if queues[((updated.key % keys_per_partition) % num_clients) as usize]
117 .send(updated.value)
118 .is_err()
119 {
120 break;
121 }
122 }
123 });
124
125 let mut line = String::new();
126 std::io::stdin().read_line(&mut line).unwrap();
127 assert!(line.starts_with("stop"));
128
129 atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
130 printer_thread.join().unwrap();
131
132 println!("end");
133
134 std::process::exit(0);
135}