latency_measure/
latency_measure.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::sync::atomic::{AtomicBool, AtomicU64};
4use std::sync::{Arc, mpsc};
5use std::thread;
6use std::time::Instant;
7
8use dfir_rs::bytes::Bytes;
9use dfir_rs::util::deploy::{ConnectedDirect, ConnectedSink, ConnectedSource};
10use dfir_rs::util::{deserialize_from_bytes, serialize_to_bytes};
11use futures::{SinkExt, StreamExt};
12pub use topolotree::protocol::*;
13
14#[tokio::main]
15async fn main() {
16    let ports = dfir_rs::util::deploy::init::<()>().await;
17    let mut start_node = ports
18        .port("increment_start_node")
19        .connect::<ConnectedDirect>()
20        .into_sink();
21
22    let mut end_node = ports
23        .port("end_node_query")
24        .connect::<ConnectedDirect>()
25        .into_source();
26
27    let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
28    let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
29    let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap();
30
31    let atomic_counter = Arc::new(AtomicU64::new(0));
32    let atomic_borrow = atomic_counter.clone();
33    let atomic_keep_running = Arc::new(AtomicBool::new(true));
34    let atomic_keep_running_clone = atomic_keep_running.clone();
35    let (latency_sender, latency_receiver) = mpsc::channel::<u128>();
36    let printer_thread = thread::spawn(move || {
37        let mut last_instant = Instant::now();
38        while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) {
39            thread::sleep(std::time::Duration::from_millis(100));
40            let now = Instant::now();
41            let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed);
42            let elapsed = now - last_instant;
43            last_instant = now;
44            println!("throughput,{},{}", counter, elapsed.as_secs_f64());
45
46            while let Ok(latency) = latency_receiver.try_recv() {
47                println!("latency,{}", latency);
48            }
49
50            std::io::stdout().flush().unwrap()
51        }
52    });
53
54    let (inc_sender, mut inc_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
55    tokio::spawn(async move {
56        loop {
57            let value = inc_receiver.recv().await.unwrap();
58            start_node.send(value).await.unwrap();
59        }
60    });
61
62    let mut queues = vec![];
63
64    for i in 0..num_clients {
65        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<i64>();
66        queues.push(sender);
67
68        let inc_sender = inc_sender.clone();
69        let latency_sender = latency_sender.clone();
70        let atomic_counter = atomic_counter.clone();
71        let keep_running = atomic_keep_running.clone();
72        tokio::spawn(async move {
73            #[cfg(debug_assertions)]
74            let mut count_tracker = HashMap::new();
75
76            let mut next_base: u64 = 0;
77
78            while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
79                let id = (partition_n * keys_per_partition)
80                    + ((((next_base % keys_per_partition) / num_clients) * num_clients) + i);
81                next_base = next_base.wrapping_add(1);
82                let increment = rand::random::<bool>();
83                let change = if increment { 1 } else { -1 };
84                let start = Instant::now();
85                inc_sender
86                    .send(serialize_to_bytes(OperationPayload { key: id, change }))
87                    .unwrap();
88
89                let received = receiver.recv().await.unwrap();
90                #[cfg(debug_assertions)]
91                {
92                    let count = count_tracker.entry(id).or_insert(0);
93                    *count += change;
94                    assert_eq!(*count, received);
95                }
96
97                if next_base.is_multiple_of(100) {
98                    latency_sender.send(start.elapsed().as_micros()).unwrap();
99                }
100
101                atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
102            }
103        });
104    }
105
106    tokio::spawn(async move {
107        loop {
108            let updated =
109                deserialize_from_bytes::<QueryResponse>(end_node.next().await.unwrap().unwrap())
110                    .unwrap();
111
112            if updated.key / keys_per_partition != partition_n {
113                continue;
114            }
115
116            if queues[((updated.key % keys_per_partition) % num_clients) as usize]
117                .send(updated.value)
118                .is_err()
119            {
120                break;
121            }
122        }
123    });
124
125    let mut line = String::new();
126    std::io::stdin().read_line(&mut line).unwrap();
127    assert!(line.starts_with("stop"));
128
129    atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
130    printer_thread.join().unwrap();
131
132    println!("end");
133
134    std::process::exit(0);
135}