latency_measure/
latency_measure.rs
1use std::collections::HashMap;
2use std::io::Write;
3use std::sync::atomic::{AtomicBool, AtomicU64};
4use std::sync::{Arc, mpsc};
5use std::thread;
6use std::time::Instant;
7
8use dfir_rs::bytes::Bytes;
9use dfir_rs::util::deploy::{ConnectedDirect, ConnectedSink, ConnectedSource};
10use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
11use futures::{SinkExt, StreamExt};
12
13mod protocol;
14use protocol::*;
15
16#[tokio::main]
17async fn main() {
18 let ports = dfir_rs::util::deploy::init::<()>().await;
19 let mut start_node = ports
20 .port("increment_start_node")
21 .connect::<ConnectedDirect>()
22 .await
23 .into_sink();
24
25 let mut end_node = ports
26 .port("end_node_query")
27 .connect::<ConnectedDirect>()
28 .await
29 .into_source();
30
31 let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
32 let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
33 let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap();
34
35 let atomic_counter = Arc::new(AtomicU64::new(0));
36 let atomic_borrow = atomic_counter.clone();
37 let atomic_keep_running = Arc::new(AtomicBool::new(true));
38 let atomic_keep_running_clone = atomic_keep_running.clone();
39 let (latency_sender, latency_receiver) = mpsc::channel::<u128>();
40 let printer_thread = thread::spawn(move || {
41 let mut last_instant = Instant::now();
42 while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) {
43 thread::sleep(std::time::Duration::from_millis(100));
44 let now = Instant::now();
45 let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed);
46 let elapsed = now - last_instant;
47 last_instant = now;
48 println!("throughput,{},{}", counter, elapsed.as_secs_f64());
49
50 while let Ok(latency) = latency_receiver.try_recv() {
51 println!("latency,{}", latency);
52 }
53
54 std::io::stdout().flush().unwrap()
55 }
56 });
57
58 let (inc_sender, mut inc_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
59 tokio::spawn(async move {
60 loop {
61 let value = inc_receiver.recv().await.unwrap();
62 start_node.send(value).await.unwrap();
63 }
64 });
65
66 let mut queues = vec![];
67
68 for i in 0..num_clients {
69 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<i64>();
70 queues.push(sender);
71
72 let inc_sender = inc_sender.clone();
73 let latency_sender = latency_sender.clone();
74 let atomic_counter = atomic_counter.clone();
75 let keep_running = atomic_keep_running.clone();
76 tokio::spawn(async move {
77 #[cfg(debug_assertions)]
78 let mut count_tracker = HashMap::new();
79
80 let mut next_base: u64 = 0;
81
82 while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
83 let id = (partition_n * keys_per_partition)
84 + ((((next_base % keys_per_partition) / num_clients) * num_clients) + i);
85 next_base = next_base.wrapping_add(1);
86 let increment = rand::random::<bool>();
87 let change = if increment { 1 } else { -1 };
88 let start = Instant::now();
89 inc_sender
90 .send(serialize_to_bytes(OperationPayload { key: id, change }))
91 .unwrap();
92
93 let received = receiver.recv().await.unwrap();
94 #[cfg(debug_assertions)]
95 {
96 let count = count_tracker.entry(id).or_insert(0);
97 *count += change;
98 assert_eq!(*count, received);
99 }
100
101 if next_base % 100 == 0 {
102 latency_sender.send(start.elapsed().as_micros()).unwrap();
103 }
104
105 atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
106 }
107 });
108 }
109
110 tokio::spawn(async move {
111 loop {
112 let updated =
113 deserialize_from_bytes::<QueryResponse>(end_node.next().await.unwrap().unwrap())
114 .unwrap();
115
116 if updated.key / keys_per_partition != partition_n {
117 continue;
118 }
119
120 if queues[((updated.key % keys_per_partition) % num_clients) as usize]
121 .send(updated.value)
122 .is_err()
123 {
124 break;
125 }
126 }
127 });
128
129 let mut line = String::new();
130 std::io::stdin().read_line(&mut line).unwrap();
131 assert!(line.starts_with("stop"));
132
133 atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
134 printer_thread.join().unwrap();
135
136 println!("end");
137
138 std::process::exit(0);
139}