hydro_test/external_client/
http_counter.rs1use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
2use hydro_lang::prelude::*;
3
4#[derive(Debug, Clone)]
5pub enum RequestType {
6 Increment { key: i32 },
7 Get { key: i32 },
8 Invalid,
9}
10
11#[derive(Debug, Clone)]
12pub struct ParsedRequest {
13 pub connection_id: u64,
14 pub request_type: RequestType,
15 pub raw_request: String,
16}
17
18pub fn http_counter_server<'a, P>(
19 in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
20) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, NoOrder> {
21 let parsed_requests = in_stream
22 .fold_early_stop(
23 q!(|| String::new()),
24 q!(|buffer, line| {
25 buffer.push_str(&line);
26 buffer.push_str("\r\n");
27 line.trim().is_empty()
29 }),
30 )
31 .map_with_key(q!(|(connection_id, raw_request)| {
32 let lines: Vec<&str> = raw_request.lines().collect();
33 let request_line = lines.first().unwrap_or(&"");
34 let parts: Vec<&str> = request_line.split_whitespace().collect();
35 let method = parts.first().unwrap_or(&"GET");
36 let path = parts.get(1).unwrap_or(&"/");
37
38 let request_type = if method == &"POST" && path.starts_with("/increment/") {
39 if let Ok(key) = path[11..].parse::<i32>() {
40 RequestType::Increment { key }
41 } else {
42 RequestType::Invalid
43 }
44 } else if method == &"GET" && path.starts_with("/get/") {
45 if let Ok(key) = path[5..].parse::<i32>() {
46 RequestType::Get { key }
47 } else {
48 RequestType::Invalid
49 }
50 } else {
51 RequestType::Invalid
52 };
53
54 ParsedRequest {
55 connection_id,
56 request_type,
57 raw_request,
58 }
59 }));
60
61 let increment_stream = parsed_requests
62 .clone()
63 .filter_map(q!(|req| match req.request_type {
64 RequestType::Increment { key } => Some(key),
65 _ => None,
66 }))
67 .atomic();
68
69 let get_stream = parsed_requests
70 .clone()
71 .filter_map(q!(|req| match req.request_type {
72 RequestType::Get { key } => Some(key),
73 _ => None,
74 }));
75
76 let invalid_requests = parsed_requests.filter_map(q!(|req| match req.request_type {
77 RequestType::Invalid => Some(req.raw_request),
78 _ => None,
79 }));
80
81 let counters = increment_stream
82 .clone()
83 .values()
84 .map(q!(|key| (key, ())))
85 .into_keyed()
86 .value_counts();
87
88 let lookup_result = sliced! {
89 let batch_get_requests = use(get_stream, nondet!());
90 let cur_counters = use::atomic(counters, nondet!());
91
92 batch_get_requests.lookup_keyed_singleton(cur_counters).into_keyed_stream()
93 };
94 let get_responses =
95 lookup_result.map(q!(|(key, maybe_count)| if let Some(count) = maybe_count {
96 format!(
97 "HTTP/1.1 200 OK\r\n\
98 Content-Type: application/json\r\n\
99 Content-Length: {}\r\n\
100 Connection: close\r\n\
101 \r\n\
102 {{\"key\": {}, \"count\": {}}}",
103 format!("{{\"key\": {}, \"count\": {}}}", key, count).len(),
104 key,
105 count
106 )
107 } else {
108 format!(
109 "HTTP/1.1 200 OK\r\n\
110 Content-Type: application/json\r\n\
111 Content-Length: {}\r\n\
112 Connection: close\r\n\
113 \r\n\
114 {{\"key\": {}, \"count\": 0}}",
115 format!("{{\"key\": {}, \"count\": 0}}", key).len(),
116 key
117 )
118 }));
119
120 let increment_responses = increment_stream
122 .map(q!(|key| {
123 format!(
124 "HTTP/1.1 200 OK\r\n\
125 Content-Type: application/json\r\n\
126 Content-Length: {}\r\n\
127 Connection: close\r\n\
128 \r\n\
129 {{\"key\": {}, \"status\": \"incremented\"}}",
130 format!("{{\"key\": {}, \"status\": \"incremented\"}}", key).len(),
131 key
132 )
133 }))
134 .end_atomic();
135
136 let invalid_responses = invalid_requests.map(q!(|_raw_request| {
137 let error_body =
138 "{\"error\": \"Invalid request. Use POST /increment/{key} or GET /get/{key}\"}";
139 format!(
140 "HTTP/1.1 400 Bad Request\r\n\
141 Content-Type: application/json\r\n\
142 Content-Length: {}\r\n\
143 Connection: close\r\n\
144 \r\n\
145 {}",
146 error_body.len(),
147 error_body
148 )
149 }));
150
151 get_responses
152 .merge_unordered(increment_responses.into_keyed_stream())
153 .merge_unordered(invalid_responses.into_keyed_stream())
154}