hydro_test/external_client/
http_counter.rs1use hydro_lang::keyed_stream::KeyedStream;
2use hydro_lang::*;
3
4#[derive(Debug, Clone)]
5pub enum RequestType {
6 Increment { key: i32 },
7 Get { key: i32 },
8 Invalid,
9}
10
11#[derive(Debug, Clone)]
12pub struct ParsedRequest {
13 pub connection_id: u64,
14 pub request_type: RequestType,
15 pub raw_request: String,
16}
17
18pub fn http_counter_server<'a, P>(
19 in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
20 process: &Process<'a, P>,
21) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, NoOrder> {
22 let parsed_requests = in_stream
23 .fold_early_stop(
24 q!(|| String::new()),
25 q!(|buffer, line| {
26 buffer.push_str(&line);
27 buffer.push_str("\r\n");
28 line.trim().is_empty()
30 }),
31 )
32 .map_with_key(q!(|(connection_id, raw_request)| {
33 let lines: Vec<&str> = raw_request.lines().collect();
34 let request_line = lines.first().unwrap_or(&"");
35 let parts: Vec<&str> = request_line.split_whitespace().collect();
36 let method = parts.first().unwrap_or(&"GET");
37 let path = parts.get(1).unwrap_or(&"/");
38
39 let request_type = if method == &"POST" && path.starts_with("/increment/") {
40 if let Ok(key) = path[11..].parse::<i32>() {
41 RequestType::Increment { key }
42 } else {
43 RequestType::Invalid
44 }
45 } else if method == &"GET" && path.starts_with("/get/") {
46 if let Ok(key) = path[5..].parse::<i32>() {
47 RequestType::Get { key }
48 } else {
49 RequestType::Invalid
50 }
51 } else {
52 RequestType::Invalid
53 };
54
55 ParsedRequest {
56 connection_id,
57 request_type,
58 raw_request,
59 }
60 }));
61
62 let increment_lookup_tick = process.tick();
63 let increment_stream = parsed_requests
64 .clone()
65 .filter_map(q!(|req| match req.request_type {
66 RequestType::Increment { key } => Some(key),
67 _ => None,
68 }))
69 .atomic(&increment_lookup_tick);
70
71 let get_stream = parsed_requests
72 .clone()
73 .filter_map(q!(|req| match req.request_type {
74 RequestType::Get { key } => Some(key),
75 _ => None,
76 }));
77
78 let invalid_requests = parsed_requests.filter_map(q!(|req| match req.request_type {
79 RequestType::Invalid => Some(req.raw_request),
80 _ => None,
81 }));
82
83 let counters = increment_stream
84 .clone()
85 .values()
86 .map(q!(|key| (key, ())))
87 .into_keyed()
88 .fold_commutative(q!(|| 0i32), q!(|acc, _| *acc += 1));
89
90 let lookup_result = get_stream
91 .batch(&increment_lookup_tick, nondet!())
92 .get_from(counters.snapshot(nondet!()));
93 let get_responses = lookup_result
94 .clone()
95 .map(q!(|(key, maybe_count)| {
96 if let Some(count) = maybe_count {
97 format!(
98 "HTTP/1.1 200 OK\r\n\
99 Content-Type: application/json\r\n\
100 Content-Length: {}\r\n\
101 Connection: close\r\n\
102 \r\n\
103 {{\"key\": {}, \"count\": {}}}",
104 format!("{{\"key\": {}, \"count\": {}}}", key, count).len(),
105 key,
106 count
107 )
108 } else {
109 format!(
110 "HTTP/1.1 200 OK\r\n\
111 Content-Type: application/json\r\n\
112 Content-Length: {}\r\n\
113 Connection: close\r\n\
114 \r\n\
115 {{\"key\": {}, \"count\": 0}}",
116 format!("{{\"key\": {}, \"count\": 0}}", key).len(),
117 key
118 )
119 }
120 }))
121 .into_keyed_stream()
122 .all_ticks();
123
124 let increment_responses = increment_stream
126 .map(q!(|key| {
127 format!(
128 "HTTP/1.1 200 OK\r\n\
129 Content-Type: application/json\r\n\
130 Content-Length: {}\r\n\
131 Connection: close\r\n\
132 \r\n\
133 {{\"key\": {}, \"status\": \"incremented\"}}",
134 format!("{{\"key\": {}, \"status\": \"incremented\"}}", key).len(),
135 key
136 )
137 }))
138 .end_atomic();
139
140 let invalid_responses = invalid_requests.map(q!(|_raw_request| {
141 let error_body =
142 "{\"error\": \"Invalid request. Use POST /increment/{key} or GET /get/{key}\"}";
143 format!(
144 "HTTP/1.1 400 Bad Request\r\n\
145 Content-Type: application/json\r\n\
146 Content-Length: {}\r\n\
147 Connection: close\r\n\
148 \r\n\
149 {}",
150 error_body.len(),
151 error_body
152 )
153 }));
154
155 get_responses
156 .interleave(increment_responses.into_keyed_stream())
157 .interleave(invalid_responses.into_keyed_stream())
158}