hydro_test/external_client/
http_counter.rs

1use hydro_lang::keyed_stream::KeyedStream;
2use hydro_lang::*;
3
4#[derive(Debug, Clone)]
5pub enum RequestType {
6    Increment { key: i32 },
7    Get { key: i32 },
8    Invalid,
9}
10
11#[derive(Debug, Clone)]
12pub struct ParsedRequest {
13    pub connection_id: u64,
14    pub request_type: RequestType,
15    pub raw_request: String,
16}
17
18pub fn http_counter_server<'a, P>(
19    in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
20    process: &Process<'a, P>,
21) -> KeyedStream<u64, String, Process<'a, P>, Unbounded, NoOrder> {
22    let parsed_requests = in_stream
23        .fold_early_stop(
24            q!(|| String::new()),
25            q!(|buffer, line| {
26                buffer.push_str(&line);
27                buffer.push_str("\r\n");
28                // Check if this is an empty line (end of HTTP headers)
29                line.trim().is_empty()
30            }),
31        )
32        .map_with_key(q!(|(connection_id, raw_request)| {
33            let lines: Vec<&str> = raw_request.lines().collect();
34            let request_line = lines.first().unwrap_or(&"");
35            let parts: Vec<&str> = request_line.split_whitespace().collect();
36            let method = parts.first().unwrap_or(&"GET");
37            let path = parts.get(1).unwrap_or(&"/");
38
39            let request_type = if method == &"POST" && path.starts_with("/increment/") {
40                if let Ok(key) = path[11..].parse::<i32>() {
41                    RequestType::Increment { key }
42                } else {
43                    RequestType::Invalid
44                }
45            } else if method == &"GET" && path.starts_with("/get/") {
46                if let Ok(key) = path[5..].parse::<i32>() {
47                    RequestType::Get { key }
48                } else {
49                    RequestType::Invalid
50                }
51            } else {
52                RequestType::Invalid
53            };
54
55            ParsedRequest {
56                connection_id,
57                request_type,
58                raw_request,
59            }
60        }));
61
62    let increment_lookup_tick = process.tick();
63    let increment_stream = parsed_requests
64        .clone()
65        .filter_map(q!(|req| match req.request_type {
66            RequestType::Increment { key } => Some(key),
67            _ => None,
68        }))
69        .atomic(&increment_lookup_tick);
70
71    let get_stream = parsed_requests
72        .clone()
73        .filter_map(q!(|req| match req.request_type {
74            RequestType::Get { key } => Some(key),
75            _ => None,
76        }));
77
78    let invalid_requests = parsed_requests.filter_map(q!(|req| match req.request_type {
79        RequestType::Invalid => Some(req.raw_request),
80        _ => None,
81    }));
82
83    let counters = increment_stream
84        .clone()
85        .values()
86        .map(q!(|key| (key, ())))
87        .into_keyed()
88        .fold_commutative(q!(|| 0i32), q!(|acc, _| *acc += 1));
89
90    let lookup_result = get_stream
91        .batch(&increment_lookup_tick, nondet!(/** batch get requests */))
92        .get_from(counters.snapshot(nondet!(/** intentional non-determinism for get timing */)));
93    let get_responses = lookup_result
94        .clone()
95        .map(q!(|(key, maybe_count)| {
96            if let Some(count) = maybe_count {
97                format!(
98                    "HTTP/1.1 200 OK\r\n\
99                    Content-Type: application/json\r\n\
100                    Content-Length: {}\r\n\
101                    Connection: close\r\n\
102                    \r\n\
103                    {{\"key\": {}, \"count\": {}}}",
104                    format!("{{\"key\": {}, \"count\": {}}}", key, count).len(),
105                    key,
106                    count
107                )
108            } else {
109                format!(
110                    "HTTP/1.1 200 OK\r\n\
111                        Content-Type: application/json\r\n\
112                        Content-Length: {}\r\n\
113                        Connection: close\r\n\
114                        \r\n\
115                        {{\"key\": {}, \"count\": 0}}",
116                    format!("{{\"key\": {}, \"count\": 0}}", key).len(),
117                    key
118                )
119            }
120        }))
121        .into_keyed_stream()
122        .all_ticks();
123
124    // Handle increment responses (just acknowledge)
125    let increment_responses = increment_stream
126        .map(q!(|key| {
127            format!(
128                "HTTP/1.1 200 OK\r\n\
129                 Content-Type: application/json\r\n\
130                 Content-Length: {}\r\n\
131                 Connection: close\r\n\
132                 \r\n\
133                 {{\"key\": {}, \"status\": \"incremented\"}}",
134                format!("{{\"key\": {}, \"status\": \"incremented\"}}", key).len(),
135                key
136            )
137        }))
138        .end_atomic();
139
140    let invalid_responses = invalid_requests.map(q!(|_raw_request| {
141        let error_body =
142            "{\"error\": \"Invalid request. Use POST /increment/{key} or GET /get/{key}\"}";
143        format!(
144            "HTTP/1.1 400 Bad Request\r\n\
145                 Content-Type: application/json\r\n\
146                 Content-Length: {}\r\n\
147                 Connection: close\r\n\
148                 \r\n\
149                 {}",
150            error_body.len(),
151            error_body
152        )
153    }));
154
155    get_responses
156        .interleave(increment_responses.into_keyed_stream())
157        .interleave(invalid_responses.into_keyed_stream())
158}