latency_measure/
latency_measure.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::sync::atomic::{AtomicBool, AtomicU64};
4use std::sync::{Arc, mpsc};
5use std::thread;
6use std::time::Instant;
7
8use dfir_rs::bytes::Bytes;
9use dfir_rs::util::deploy::{ConnectedDirect, ConnectedSink, ConnectedSource};
10use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
11use futures::{SinkExt, StreamExt};
12
13mod protocol;
14use protocol::*;
15
16#[tokio::main]
17async fn main() {
18    let ports = dfir_rs::util::deploy::init::<()>().await;
19    let mut start_node = ports
20        .port("increment_start_node")
21        .connect::<ConnectedDirect>()
22        .await
23        .into_sink();
24
25    let mut end_node = ports
26        .port("end_node_query")
27        .connect::<ConnectedDirect>()
28        .await
29        .into_source();
30
31    let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
32    let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
33    let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap();
34
35    let atomic_counter = Arc::new(AtomicU64::new(0));
36    let atomic_borrow = atomic_counter.clone();
37    let atomic_keep_running = Arc::new(AtomicBool::new(true));
38    let atomic_keep_running_clone = atomic_keep_running.clone();
39    let (latency_sender, latency_receiver) = mpsc::channel::<u128>();
40    let printer_thread = thread::spawn(move || {
41        let mut last_instant = Instant::now();
42        while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) {
43            thread::sleep(std::time::Duration::from_millis(100));
44            let now = Instant::now();
45            let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed);
46            let elapsed = now - last_instant;
47            last_instant = now;
48            println!("throughput,{},{}", counter, elapsed.as_secs_f64());
49
50            while let Ok(latency) = latency_receiver.try_recv() {
51                println!("latency,{}", latency);
52            }
53
54            std::io::stdout().flush().unwrap()
55        }
56    });
57
58    let (inc_sender, mut inc_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
59    tokio::spawn(async move {
60        loop {
61            let value = inc_receiver.recv().await.unwrap();
62            start_node.send(value).await.unwrap();
63        }
64    });
65
66    let mut queues = vec![];
67
68    for i in 0..num_clients {
69        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<i64>();
70        queues.push(sender);
71
72        let inc_sender = inc_sender.clone();
73        let latency_sender = latency_sender.clone();
74        let atomic_counter = atomic_counter.clone();
75        let keep_running = atomic_keep_running.clone();
76        tokio::spawn(async move {
77            #[cfg(debug_assertions)]
78            let mut count_tracker = HashMap::new();
79
80            let mut next_base: u64 = 0;
81
82            while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
83                let id = (partition_n * keys_per_partition)
84                    + ((((next_base % keys_per_partition) / num_clients) * num_clients) + i);
85                next_base = next_base.wrapping_add(1);
86                let increment = rand::random::<bool>();
87                let change = if increment { 1 } else { -1 };
88                let start = Instant::now();
89                inc_sender
90                    .send(serialize_to_bytes(OperationPayload { key: id, change }))
91                    .unwrap();
92
93                let received = receiver.recv().await.unwrap();
94                #[cfg(debug_assertions)]
95                {
96                    let count = count_tracker.entry(id).or_insert(0);
97                    *count += change;
98                    assert_eq!(*count, received);
99                }
100
101                if next_base % 100 == 0 {
102                    latency_sender.send(start.elapsed().as_micros()).unwrap();
103                }
104
105                atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
106            }
107        });
108    }
109
110    tokio::spawn(async move {
111        loop {
112            let updated =
113                deserialize_from_bytes::<QueryResponse>(end_node.next().await.unwrap().unwrap())
114                    .unwrap();
115
116            if updated.key / keys_per_partition != partition_n {
117                continue;
118            }
119
120            if queues[((updated.key % keys_per_partition) % num_clients) as usize]
121                .send(updated.value)
122                .is_err()
123            {
124                break;
125            }
126        }
127    });
128
129    let mut line = String::new();
130    std::io::stdin().read_line(&mut line).unwrap();
131    assert!(line.starts_with("stop"));
132
133    atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
134    printer_thread.join().unwrap();
135
136    println!("end");
137
138    std::process::exit(0);
139}