Skip to main content

hydro_test/external_client/
http_counter.rs

1use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
2use hydro_lang::prelude::*;
3
4#[derive(Debug, Clone)]
5pub enum RequestType {
6    Increment { key: i32 },
7    Get { key: i32 },
8    Invalid,
9}
10
11#[derive(Debug, Clone)]
12pub struct ParsedRequest {
13    pub connection_id: u64,
14    pub request_type: RequestType,
15    pub raw_request: String,
16}
17
18pub fn http_counter_server<'a, P>(
19    in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
20) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, NoOrder> {
21    let parsed_requests = in_stream
22        .fold_early_stop(
23            q!(|| String::new()),
24            q!(|buffer, line| {
25                buffer.push_str(&line);
26                buffer.push_str("\r\n");
27                // Check if this is an empty line (end of HTTP headers)
28                line.trim().is_empty()
29            }),
30        )
31        .map_with_key(q!(|(connection_id, raw_request)| {
32            let lines: Vec<&str> = raw_request.lines().collect();
33            let request_line = lines.first().unwrap_or(&"");
34            let parts: Vec<&str> = request_line.split_whitespace().collect();
35            let method = parts.first().unwrap_or(&"GET");
36            let path = parts.get(1).unwrap_or(&"/");
37
38            let request_type = if method == &"POST" && path.starts_with("/increment/") {
39                if let Ok(key) = path[11..].parse::<i32>() {
40                    RequestType::Increment { key }
41                } else {
42                    RequestType::Invalid
43                }
44            } else if method == &"GET" && path.starts_with("/get/") {
45                if let Ok(key) = path[5..].parse::<i32>() {
46                    RequestType::Get { key }
47                } else {
48                    RequestType::Invalid
49                }
50            } else {
51                RequestType::Invalid
52            };
53
54            ParsedRequest {
55                connection_id,
56                request_type,
57                raw_request,
58            }
59        }));
60
61    let increment_stream = parsed_requests
62        .clone()
63        .filter_map(q!(|req| match req.request_type {
64            RequestType::Increment { key } => Some(key),
65            _ => None,
66        }))
67        .atomic();
68
69    let get_stream = parsed_requests
70        .clone()
71        .filter_map(q!(|req| match req.request_type {
72            RequestType::Get { key } => Some(key),
73            _ => None,
74        }));
75
76    let invalid_requests = parsed_requests.filter_map(q!(|req| match req.request_type {
77        RequestType::Invalid => Some(req.raw_request),
78        _ => None,
79    }));
80
81    let counters = increment_stream
82        .clone()
83        .values()
84        .map(q!(|key| (key, ())))
85        .into_keyed()
86        .value_counts();
87
88    let lookup_result = sliced! {
89        let batch_get_requests = use(get_stream, nondet!(/** batch get requests */));
90        let cur_counters = use::atomic(counters, nondet!(/** intentional non-determinism for get timing */));
91
92        batch_get_requests.lookup_keyed_singleton(cur_counters).into_keyed_stream()
93    };
94    let get_responses =
95        lookup_result.map(q!(|(key, maybe_count)| if let Some(count) = maybe_count {
96            format!(
97                "HTTP/1.1 200 OK\r\n\
98                    Content-Type: application/json\r\n\
99                    Content-Length: {}\r\n\
100                    Connection: close\r\n\
101                    \r\n\
102                    {{\"key\": {}, \"count\": {}}}",
103                format!("{{\"key\": {}, \"count\": {}}}", key, count).len(),
104                key,
105                count
106            )
107        } else {
108            format!(
109                "HTTP/1.1 200 OK\r\n\
110                        Content-Type: application/json\r\n\
111                        Content-Length: {}\r\n\
112                        Connection: close\r\n\
113                        \r\n\
114                        {{\"key\": {}, \"count\": 0}}",
115                format!("{{\"key\": {}, \"count\": 0}}", key).len(),
116                key
117            )
118        }));
119
120    // Handle increment responses (just acknowledge)
121    let increment_responses = increment_stream
122        .map(q!(|key| {
123            format!(
124                "HTTP/1.1 200 OK\r\n\
125                 Content-Type: application/json\r\n\
126                 Content-Length: {}\r\n\
127                 Connection: close\r\n\
128                 \r\n\
129                 {{\"key\": {}, \"status\": \"incremented\"}}",
130                format!("{{\"key\": {}, \"status\": \"incremented\"}}", key).len(),
131                key
132            )
133        }))
134        .end_atomic();
135
136    let invalid_responses = invalid_requests.map(q!(|_raw_request| {
137        let error_body =
138            "{\"error\": \"Invalid request. Use POST /increment/{key} or GET /get/{key}\"}";
139        format!(
140            "HTTP/1.1 400 Bad Request\r\n\
141                 Content-Type: application/json\r\n\
142                 Content-Length: {}\r\n\
143                 Connection: close\r\n\
144                 \r\n\
145                 {}",
146            error_body.len(),
147            error_body
148        )
149    }));
150
151    get_responses
152        .merge_unordered(increment_responses.into_keyed_stream())
153        .merge_unordered(invalid_responses.into_keyed_stream())
154}