1#![allow(
2 unused,
3 reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::demux_map_lazy::LazyDemuxSink;
20use sinktools::lazy::{LazySink, LazySource};
21use sinktools::lazy_sink_source::LazySinkSource;
22use stageleft::runtime_support::{
23 FreeVariableWithContext, FreeVariableWithContextWithProps, QuoteTokens,
24};
25use stageleft::{QuotedWithContext, q};
26use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
27use tokio::net::{TcpListener, TcpStream};
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, debug, error, instrument, span, trace, trace_span};
30
31use crate::location::dynamic::LocationId;
32use crate::location::member_id::TaglessMemberId;
33use crate::location::{MemberId, MembershipEvent};
34
35pub fn deploy_containerized_o2o(target_container: &str, bind_port: u16) -> (syn::Expr, syn::Expr) {
36 (
37 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
38 async move {
39 let target_container = target_container;
40 let ip = self::resolve_container_ip(target_container).await;
41 let target = format!("{}:{}", ip, bind_port);
42 debug!(name: "connecting", %target, %target_container);
43
44 let stream = TcpStream::connect(&target).await?;
45
46 Result::<_, std::io::Error>::Ok(FramedWrite::new(
47 stream,
48 LengthDelimitedCodec::new(),
49 ))
50 }
51 )))
52 .splice_untyped_ctx(&()),
53 q!(LazySource::new(move || Box::pin(async move {
54 let bind_addr = format!("0.0.0.0:{}", bind_port);
55 let listener = TcpListener::bind(bind_addr).await?;
56 let (stream, peer) = listener.accept().await?;
57 debug!(name: "accepting", ?peer);
58 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
59 })))
60 .splice_untyped_ctx(&()),
61 )
62}
63
64pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
65 (
66 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
67 q!(sinktools::demux_map_lazy::<_, _, _, _>(
68 move |key: &TaglessMemberId| {
69 let key = key.clone();
70
71 LazySink::<_, _, _, bytes::Bytes>::new(move || {
72 Box::pin(async move {
73 let port = port;
74 let container_name = key.get_container_name();
75 let ip = self::resolve_container_ip(&container_name).await;
76 let target = format!("{}:{}", ip, port);
77 debug!(name: "connecting", %target, %container_name);
78
79 let stream = TcpStream::connect(&target).await?;
80
81 let sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
82 Result::<_, std::io::Error>::Ok(sink)
83 })
84 })
85 }
86 )),
87 &(),
88 ),
89 q!(LazySource::new(move || Box::pin(async move {
90 let bind_addr = format!("0.0.0.0:{}", port);
91 debug!(name: "listening", %bind_addr);
92 let listener = TcpListener::bind(bind_addr).await?;
93 let (stream, peer) = listener.accept().await?;
94 debug!(name: "accepting", ?peer);
95
96 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
97 })))
98 .splice_untyped_ctx(&()),
99 )
100}
101
102pub fn deploy_containerized_m2o(port: u16, target_container: &str) -> (syn::Expr, syn::Expr) {
103 (
104 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
105 Box::pin(async move {
106 let target_container = target_container;
107 let ip = self::resolve_container_ip(target_container).await;
108 let target = format!("{}:{}", ip, port);
109 debug!(name: "connecting", %target, %target_container);
110
111 let stream = TcpStream::connect(&target).await?;
112
113 let mut sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
114
115 sink.send(bytes::Bytes::from(
116 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
117 .unwrap(),
118 ))
119 .await?;
120
121 Result::<_, std::io::Error>::Ok(sink)
122 })
123 }))
124 .splice_untyped_ctx(&()),
125 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
126 q!(LazySource::new(move || Box::pin(async move {
127 let bind_addr = format!("0.0.0.0:{}", port);
128 debug!(name: "listening", %bind_addr);
129 let listener = TcpListener::bind(bind_addr).await?;
130 Result::<_, std::io::Error>::Ok(
131 futures::stream::unfold(listener, |listener| {
132 Box::pin(async move {
133 let (stream, peer) = listener.accept().await.ok()?;
134 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
135 let from =
136 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
137 .ok()?;
138
139 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
140
141 Some((
142 source.map(move |v| {
143 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
144 }),
145 listener,
146 ))
147 })
148 })
149 .flatten_unordered(None),
150 )
151 }))),
152 &(),
153 ),
154 )
155}
156
157pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
158 (
159 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
160 q!(sinktools::demux_map_lazy::<_, _, _, _>(
161 move |key: &TaglessMemberId| {
162 let key = key.clone();
163
164 LazySink::<_, _, _, bytes::Bytes>::new(move || {
165 Box::pin(async move {
166 let port = port;
167 let container_name = key.get_container_name();
168 let ip = self::resolve_container_ip(&container_name).await;
169 let target = format!("{}:{}", ip, port);
170 debug!(name: "connecting", %target, %container_name);
171
172 let stream = TcpStream::connect(&target).await?;
173
174 let mut sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
175 debug!(name: "connected", %target);
176
177 sink.send(bytes::Bytes::from(
178 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
179 .unwrap(),
180 ))
181 .await?;
182
183 Result::<_, std::io::Error>::Ok(sink)
184 })
185 })
186 }
187 )),
188 &(),
189 ),
190 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
191 q!(LazySource::new(move || Box::pin(async move {
192 let bind_addr = format!("0.0.0.0:{}", port);
193 debug!(name: "listening", %bind_addr);
194 let listener = TcpListener::bind(bind_addr).await?;
195
196 Result::<_, std::io::Error>::Ok(
197 futures::stream::unfold(listener, |listener| {
198 Box::pin(async move {
199 let (stream, peer) = listener.accept().await.ok()?;
200 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
201 let from =
202 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
203 .ok()?;
204
205 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
206
207 Some((
208 source.map(move |v| {
209 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
210 }),
211 listener,
212 ))
213 })
214 })
215 .flatten_unordered(None),
216 )
217 }))),
218 &(),
219 ),
220 )
221}
222
223pub struct SocketIdent {
224 pub socket_ident: syn::Ident,
225}
226
227impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for SocketIdent {
228 type O = TcpListener;
229
230 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
231 where
232 Self: Sized,
233 {
234 let ident = self.socket_ident;
235
236 (
237 QuoteTokens {
238 prelude: None,
239 expr: Some(quote::quote! { #ident }),
240 },
241 (),
242 )
243 }
244}
245
246pub fn deploy_containerized_external_sink_source_ident(
247 bind_addr: String,
248 socket_ident: syn::Ident,
249) -> syn::Expr {
250 let socket_ident = SocketIdent { socket_ident };
251
252 q!(LazySinkSource::<
253 _,
254 FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
255 FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
256 bytes::Bytes,
257 std::io::Error,
259 >::new(async move {
260 let span = span!(tracing::Level::TRACE, "lazy_sink_source");
261 let guard = span.enter();
262 let bind_addr = bind_addr;
263 trace!(name: "attempting to accept from external", %bind_addr);
264 std::mem::drop(guard);
265 let (stream, peer) = socket_ident.accept().instrument(span.clone()).await?;
266 let guard = span.enter();
267
268 debug!(name: "external accepting", ?peer);
269 let (rx, tx) = stream.into_split();
270
271 let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
272 let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
273
274 Result::<_, std::io::Error>::Ok((fr, fw))
275 },))
276 .splice_untyped_ctx(&())
277}
278
279pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
280 q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
284 "INVALID CONTAINER NAME cluster_ids"
285 )]))
286 .as_slice())
287}
288
289pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
290 q!(TaglessMemberId::from_container_name(
291 std::env::var("CONTAINER_NAME").unwrap()
292 ))
293}
294
295pub fn cluster_membership_stream<'a>(
296 location_id: &LocationId,
297) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
298{
299 let raw_id = location_id.raw_id();
300
301 q!(Box::new(self::ecs_membership_stream(
302 std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
303 raw_id
304 ))
305 as Box<
306 dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
307 >)
308}
309
310#[instrument(skip_all, fields(%deployment_instance, %location_id))]
311fn ecs_membership_stream(
312 deployment_instance: String,
313 location_id: usize,
314) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
315 use std::collections::HashSet;
316
317 use futures::stream::{StreamExt, once};
318
319 let ecs_poller_span = trace_span!("ecs_poller");
320
321 let task_definition_arn_parser =
322 regex::Regex::new(r#"arn:aws:ecs:(?<region>.*):(?<account_id>.*):task-definition\/(?<container_id>hy-(?<type>[^-]+)-(?<image_id>[^-]+)-(?<deployment_id>[^-]+)-(?<location_id>[0-9]+)-(?<instance_id>.*)):.*"#).unwrap();
323
324 let poll_stream = futures::stream::unfold(
325 (HashSet::<String>::new(), deployment_instance, location_id),
326 move |(known_tasks, deployment_instance, location_id)| {
327 let task_definition_arn_parser = task_definition_arn_parser.clone();
328
329 async move {
330 trace!(name: "polling_ecs", known_task_count = known_tasks.len());
331
332 let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
333 let ecs_client = aws_sdk_ecs::Client::new(&config);
334
335 let cluster_name = format!("hydro-{}", deployment_instance);
336 trace!(name: "querying_tasks", %cluster_name, %location_id);
337
338 let tasks = match ecs_client
339 .list_tasks()
340 .cluster(&cluster_name)
341 .send()
342 .await
343 {
344 Ok(tasks) => tasks,
345 Err(e) => {
346 trace!(name: "list_tasks_error", error = %e);
347 tokio::time::sleep(Duration::from_secs(2)).await;
348 return Some((Vec::new(), (known_tasks, deployment_instance, location_id)));
349 }
350 };
351
352 let task_arns: Vec<String> = tasks.task_arns().iter().map(|s| s.to_string()).collect();
353 trace!(name: "tasks_found", task_count = task_arns.len());
354
355 let mut events = Vec::new();
356 let mut current_tasks = HashSet::<String>::new();
357
358 if !task_arns.is_empty() {
359 let task_details = match ecs_client
360 .describe_tasks()
361 .cluster(&cluster_name)
362 .set_tasks(Some(task_arns.clone()))
363 .send()
364 .await
365 {
366 Ok(details) => details,
367 Err(e) => {
368 trace!(name: "describe_tasks_error", error = %e);
369 tokio::time::sleep(Duration::from_secs(2)).await;
370 return Some((Vec::new(), (known_tasks, deployment_instance, location_id)));
371 }
372 };
373
374 for task in task_details.tasks() {
375 let Some(last_status) = task.last_status() else {
376 trace!(name: "task_status_missing", ?task);
377 continue;
378 };
379
380 trace!(name: "task_status", %last_status, ?task);
381
382 if last_status != "RUNNING" {
383 trace!(name: "task_not_running", %last_status, ?task);
384 continue;
385 }
386
387 let Some(task_def_arn) = task.task_definition_arn() else {
388 trace!(name: "task_def_arn_missing", ?task);
389 continue;
390 };
391
392 let Some(captures) = task_definition_arn_parser.captures(task_def_arn) else {
393 trace!(name: "task_def_arn_parse_error", %task_def_arn, ?task);
394 continue;
395 };
396
397 let Some(container_id) = captures.name("container_id") else {
398 trace!(name: "container_id_missing", %task_def_arn, ?task);
399 continue;
400 };
401 let container_id = container_id.as_str();
402
403 let Some(task_location_id) = captures.name("location_id") else {
404 trace!(name: "location_id_missing", %task_def_arn, ?task);
405 continue;
406 };
407 let task_location_id: usize = match task_location_id.as_str().parse() {
408 Ok(id) => id,
409 Err(_) => {
410 trace!(name: "location_id_parse_error", %task_def_arn, ?task);
411 continue;
412 }
413 };
414
415 if task_location_id != location_id {
417 trace!(name: "location_id_mismatch", %task_location_id, %location_id, %container_id);
418 continue;
419 }
420
421 trace!(name: "running_task", %container_id);
423 current_tasks.insert(container_id.to_string());
424 if !known_tasks.contains(container_id) {
425 trace!(name: "container_joined", %container_id);
426 events.push((container_id.to_string(), MembershipEvent::Joined));
427 }
428 }
429 }
430
431 for container_id in known_tasks.iter() {
432 if !current_tasks.contains(container_id) {
433 trace!(name: "container_left", %container_id);
434 events.push((container_id.to_owned(), MembershipEvent::Left));
435 }
436 }
437
438 trace!(name: "poll_complete", event_count = events.len(), current_task_count = current_tasks.len());
439 tokio::time::sleep(Duration::from_secs(2)).await;
440
441 Some((events, (current_tasks, deployment_instance, location_id)))
442 }.instrument(ecs_poller_span.clone())
443 }
444 )
445 .flat_map(futures::stream::iter);
446
447 Box::pin(
448 poll_stream
449 .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
450 .inspect(|(member_id, event)| trace!(name: "membership_event", ?member_id, ?event)),
451 )
452}
453
454async fn resolve_container_ip(container_name: &str) -> String {
456 let deployment_instance = std::env::var("DEPLOYMENT_INSTANCE").unwrap();
457 let cluster_name = format!("hydro-{}", deployment_instance);
458
459 let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
460 let ecs_client = aws_sdk_ecs::Client::new(&config);
461
462 loop {
463 let tasks = match ecs_client
464 .list_tasks()
465 .cluster(&cluster_name)
466 .family(container_name)
467 .send()
468 .await
469 {
470 Ok(t) => t,
471 Err(e) => {
472 trace!(name: "resolve_ip_list_error", %container_name, error = %e);
473 tokio::time::sleep(Duration::from_secs(1)).await;
474 continue;
475 }
476 };
477
478 let Some(task_arn) = tasks.task_arns().first() else {
479 trace!(name: "resolve_ip_no_task", %container_name);
480 tokio::time::sleep(Duration::from_secs(1)).await;
481 continue;
482 };
483
484 let task_details = match ecs_client
485 .describe_tasks()
486 .cluster(&cluster_name)
487 .tasks(task_arn)
488 .send()
489 .await
490 {
491 Ok(d) => d,
492 Err(e) => {
493 trace!(name: "resolve_ip_describe_error", %container_name, error = %e);
494 tokio::time::sleep(Duration::from_secs(1)).await;
495 continue;
496 }
497 };
498
499 if let Some(task) = task_details.tasks().first() {
500 if let Some(ip) = task
502 .attachments()
503 .iter()
504 .flat_map(|a| a.details())
505 .find(|d| d.name() == Some("privateIPv4Address"))
506 .and_then(|d| d.value())
507 {
508 trace!(name: "resolved_ip", %container_name, %ip);
509 return ip.to_string();
510 }
511 }
512
513 trace!(name: "resolve_ip_no_ip", %container_name);
514 tokio::time::sleep(Duration::from_secs(1)).await;
515 }
516}