hydro_lang/deploy/
deploy_runtime_containerized_ecs.rs

1#![allow(
2    unused,
3    reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::demux_map_lazy::LazyDemuxSink;
20use sinktools::lazy::{LazySink, LazySource};
21use sinktools::lazy_sink_source::LazySinkSource;
22use stageleft::runtime_support::{
23    FreeVariableWithContext, FreeVariableWithContextWithProps, QuoteTokens,
24};
25use stageleft::{QuotedWithContext, q};
26use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
27use tokio::net::{TcpListener, TcpStream};
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, debug, error, instrument, span, trace, trace_span};
30
31use crate::location::dynamic::LocationId;
32use crate::location::member_id::TaglessMemberId;
33use crate::location::{MemberId, MembershipEvent};
34
35pub fn deploy_containerized_o2o(target_container: &str, bind_port: u16) -> (syn::Expr, syn::Expr) {
36    (
37        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
38            async move {
39                let target_container = target_container;
40                let ip = self::resolve_container_ip(target_container).await;
41                let target = format!("{}:{}", ip, bind_port);
42                debug!(name: "connecting", %target, %target_container);
43
44                let stream = TcpStream::connect(&target).await?;
45
46                Result::<_, std::io::Error>::Ok(FramedWrite::new(
47                    stream,
48                    LengthDelimitedCodec::new(),
49                ))
50            }
51        )))
52        .splice_untyped_ctx(&()),
53        q!(LazySource::new(move || Box::pin(async move {
54            let bind_addr = format!("0.0.0.0:{}", bind_port);
55            let listener = TcpListener::bind(bind_addr).await?;
56            let (stream, peer) = listener.accept().await?;
57            debug!(name: "accepting", ?peer);
58            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
59        })))
60        .splice_untyped_ctx(&()),
61    )
62}
63
64pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
65    (
66        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
67            q!(sinktools::demux_map_lazy::<_, _, _, _>(
68                move |key: &TaglessMemberId| {
69                    let key = key.clone();
70
71                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
72                        Box::pin(async move {
73                            let port = port;
74                            let container_name = key.get_container_name();
75                            let ip = self::resolve_container_ip(&container_name).await;
76                            let target = format!("{}:{}", ip, port);
77                            debug!(name: "connecting", %target, %container_name);
78
79                            let stream = TcpStream::connect(&target).await?;
80
81                            let sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
82                            Result::<_, std::io::Error>::Ok(sink)
83                        })
84                    })
85                }
86            )),
87            &(),
88        ),
89        q!(LazySource::new(move || Box::pin(async move {
90            let bind_addr = format!("0.0.0.0:{}", port);
91            debug!(name: "listening", %bind_addr);
92            let listener = TcpListener::bind(bind_addr).await?;
93            let (stream, peer) = listener.accept().await?;
94            debug!(name: "accepting", ?peer);
95
96            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
97        })))
98        .splice_untyped_ctx(&()),
99    )
100}
101
102pub fn deploy_containerized_m2o(port: u16, target_container: &str) -> (syn::Expr, syn::Expr) {
103    (
104        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
105            Box::pin(async move {
106                let target_container = target_container;
107                let ip = self::resolve_container_ip(target_container).await;
108                let target = format!("{}:{}", ip, port);
109                debug!(name: "connecting", %target, %target_container);
110
111                let stream = TcpStream::connect(&target).await?;
112
113                let mut sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
114
115                sink.send(bytes::Bytes::from(
116                    bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
117                        .unwrap(),
118                ))
119                .await?;
120
121                Result::<_, std::io::Error>::Ok(sink)
122            })
123        }))
124        .splice_untyped_ctx(&()),
125        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
126            q!(LazySource::new(move || Box::pin(async move {
127                let bind_addr = format!("0.0.0.0:{}", port);
128                debug!(name: "listening", %bind_addr);
129                let listener = TcpListener::bind(bind_addr).await?;
130                Result::<_, std::io::Error>::Ok(
131                    futures::stream::unfold(listener, |listener| {
132                        Box::pin(async move {
133                            let (stream, peer) = listener.accept().await.ok()?;
134                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
135                            let from =
136                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
137                                    .ok()?;
138
139                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
140
141                            Some((
142                                source.map(move |v| {
143                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
144                                }),
145                                listener,
146                            ))
147                        })
148                    })
149                    .flatten_unordered(None),
150                )
151            }))),
152            &(),
153        ),
154    )
155}
156
157pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
158    (
159        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
160            q!(sinktools::demux_map_lazy::<_, _, _, _>(
161                move |key: &TaglessMemberId| {
162                    let key = key.clone();
163
164                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
165                        Box::pin(async move {
166                            let port = port;
167                            let container_name = key.get_container_name();
168                            let ip = self::resolve_container_ip(&container_name).await;
169                            let target = format!("{}:{}", ip, port);
170                            debug!(name: "connecting", %target, %container_name);
171
172                            let stream = TcpStream::connect(&target).await?;
173
174                            let mut sink = FramedWrite::new(stream, LengthDelimitedCodec::new());
175                            debug!(name: "connected", %target);
176
177                            sink.send(bytes::Bytes::from(
178                                bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
179                                    .unwrap(),
180                            ))
181                            .await?;
182
183                            Result::<_, std::io::Error>::Ok(sink)
184                        })
185                    })
186                }
187            )),
188            &(),
189        ),
190        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
191            q!(LazySource::new(move || Box::pin(async move {
192                let bind_addr = format!("0.0.0.0:{}", port);
193                debug!(name: "listening", %bind_addr);
194                let listener = TcpListener::bind(bind_addr).await?;
195
196                Result::<_, std::io::Error>::Ok(
197                    futures::stream::unfold(listener, |listener| {
198                        Box::pin(async move {
199                            let (stream, peer) = listener.accept().await.ok()?;
200                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
201                            let from =
202                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
203                                    .ok()?;
204
205                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
206
207                            Some((
208                                source.map(move |v| {
209                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
210                                }),
211                                listener,
212                            ))
213                        })
214                    })
215                    .flatten_unordered(None),
216                )
217            }))),
218            &(),
219        ),
220    )
221}
222
223pub struct SocketIdent {
224    pub socket_ident: syn::Ident,
225}
226
227impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for SocketIdent {
228    type O = TcpListener;
229
230    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
231    where
232        Self: Sized,
233    {
234        let ident = self.socket_ident;
235
236        (
237            QuoteTokens {
238                prelude: None,
239                expr: Some(quote::quote! { #ident }),
240            },
241            (),
242        )
243    }
244}
245
246pub fn deploy_containerized_external_sink_source_ident(
247    bind_addr: String,
248    socket_ident: syn::Ident,
249) -> syn::Expr {
250    let socket_ident = SocketIdent { socket_ident };
251
252    q!(LazySinkSource::<
253        _,
254        FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
255        FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
256        bytes::Bytes,
257        // Result<bytes::BytesMut, std::io::Error>,
258        std::io::Error,
259    >::new(async move {
260        let span = span!(tracing::Level::TRACE, "lazy_sink_source");
261        let guard = span.enter();
262        let bind_addr = bind_addr;
263        trace!(name: "attempting to accept from external", %bind_addr);
264        std::mem::drop(guard);
265        let (stream, peer) = socket_ident.accept().instrument(span.clone()).await?;
266        let guard = span.enter();
267
268        debug!(name: "external accepting", ?peer);
269        let (rx, tx) = stream.into_split();
270
271        let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
272        let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
273
274        Result::<_, std::io::Error>::Ok((fr, fw))
275    },))
276    .splice_untyped_ctx(&())
277}
278
279pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
280    // unimplemented!(); // this is unused.
281
282    // This is a dummy piece of code, since clusters are dynamic when containerized.
283    q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
284        "INVALID CONTAINER NAME cluster_ids"
285    )]))
286    .as_slice())
287}
288
289pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
290    q!(TaglessMemberId::from_container_name(
291        std::env::var("CONTAINER_NAME").unwrap()
292    ))
293}
294
295pub fn cluster_membership_stream<'a>(
296    location_id: &LocationId,
297) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
298{
299    let raw_id = location_id.raw_id();
300
301    q!(Box::new(self::ecs_membership_stream(
302        std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
303        raw_id
304    ))
305        as Box<
306            dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
307        >)
308}
309
310#[instrument(skip_all, fields(%deployment_instance, %location_id))]
311fn ecs_membership_stream(
312    deployment_instance: String,
313    location_id: usize,
314) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
315    use std::collections::HashSet;
316
317    use futures::stream::{StreamExt, once};
318
319    let ecs_poller_span = trace_span!("ecs_poller");
320
321    let task_definition_arn_parser =
322        regex::Regex::new(r#"arn:aws:ecs:(?<region>.*):(?<account_id>.*):task-definition\/(?<container_id>hy-(?<type>[^-]+)-(?<image_id>[^-]+)-(?<deployment_id>[^-]+)-(?<location_id>[0-9]+)-(?<instance_id>.*)):.*"#).unwrap();
323
324    let poll_stream = futures::stream::unfold(
325        (HashSet::<String>::new(), deployment_instance, location_id),
326        move |(known_tasks, deployment_instance, location_id)| {
327            let task_definition_arn_parser = task_definition_arn_parser.clone();
328
329            async move {
330                trace!(name: "polling_ecs", known_task_count = known_tasks.len());
331
332                let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
333                let ecs_client = aws_sdk_ecs::Client::new(&config);
334
335                let cluster_name = format!("hydro-{}", deployment_instance);
336                trace!(name: "querying_tasks", %cluster_name, %location_id);
337
338                let tasks = match ecs_client
339                    .list_tasks()
340                    .cluster(&cluster_name)
341                    .send()
342                    .await
343                {
344                    Ok(tasks) => tasks,
345                    Err(e) => {
346                        trace!(name: "list_tasks_error", error = %e);
347                        tokio::time::sleep(Duration::from_secs(2)).await;
348                        return Some((Vec::new(), (known_tasks, deployment_instance, location_id)));
349                    }
350                };
351
352                let task_arns: Vec<String> = tasks.task_arns().iter().map(|s| s.to_string()).collect();
353                trace!(name: "tasks_found", task_count = task_arns.len());
354
355                let mut events = Vec::new();
356                let mut current_tasks = HashSet::<String>::new();
357
358                if !task_arns.is_empty() {
359                    let task_details = match ecs_client
360                        .describe_tasks()
361                        .cluster(&cluster_name)
362                        .set_tasks(Some(task_arns.clone()))
363                        .send()
364                        .await
365                    {
366                        Ok(details) => details,
367                        Err(e) => {
368                            trace!(name: "describe_tasks_error", error = %e);
369                            tokio::time::sleep(Duration::from_secs(2)).await;
370                            return Some((Vec::new(), (known_tasks, deployment_instance, location_id)));
371                        }
372                    };
373
374                    for task in task_details.tasks() {
375                        let Some(last_status) = task.last_status() else {
376                            trace!(name: "task_status_missing", ?task);
377                            continue;
378                        };
379
380                        trace!(name: "task_status", %last_status, ?task);
381
382                        if last_status != "RUNNING" {
383                            trace!(name: "task_not_running", %last_status, ?task);
384                            continue;
385                        }
386
387                        let Some(task_def_arn) = task.task_definition_arn() else {
388                            trace!(name: "task_def_arn_missing", ?task);
389                            continue;
390                        };
391
392                        let Some(captures) = task_definition_arn_parser.captures(task_def_arn) else {
393                            trace!(name: "task_def_arn_parse_error", %task_def_arn, ?task);
394                            continue;
395                        };
396
397                        let Some(container_id) = captures.name("container_id") else {
398                            trace!(name: "container_id_missing", %task_def_arn, ?task);
399                            continue;
400                        };
401                        let container_id = container_id.as_str();
402
403                        let Some(task_location_id) = captures.name("location_id") else {
404                            trace!(name: "location_id_missing", %task_def_arn, ?task);
405                            continue;
406                        };
407                        let task_location_id: usize = match task_location_id.as_str().parse() {
408                            Ok(id) => id,
409                            Err(_) => {
410                                trace!(name: "location_id_parse_error", %task_def_arn, ?task);
411                                continue;
412                            }
413                        };
414
415                        // Filter by location_id - only include tasks for this specific cluster
416                        if task_location_id != location_id {
417                            trace!(name: "location_id_mismatch", %task_location_id, %location_id, %container_id);
418                            continue;
419                        }
420
421                        // Use container_id directly (not DNS name)
422                        trace!(name: "running_task", %container_id);
423                        current_tasks.insert(container_id.to_string());
424                        if !known_tasks.contains(container_id) {
425                            trace!(name: "container_joined", %container_id);
426                            events.push((container_id.to_string(), MembershipEvent::Joined));
427                        }
428                    }
429                }
430
431                for container_id in known_tasks.iter() {
432                    if !current_tasks.contains(container_id) {
433                        trace!(name: "container_left", %container_id);
434                        events.push((container_id.to_owned(), MembershipEvent::Left));
435                    }
436                }
437
438                trace!(name: "poll_complete", event_count = events.len(), current_task_count = current_tasks.len());
439                tokio::time::sleep(Duration::from_secs(2)).await;
440
441                Some((events, (current_tasks, deployment_instance, location_id)))
442            }.instrument(ecs_poller_span.clone())
443        }
444    )
445    .flat_map(futures::stream::iter);
446
447    Box::pin(
448        poll_stream
449            .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
450            .inspect(|(member_id, event)| trace!(name: "membership_event", ?member_id, ?event)),
451    )
452}
453
454/// Resolve a container name to its private IP address via ECS API
455async fn resolve_container_ip(container_name: &str) -> String {
456    let deployment_instance = std::env::var("DEPLOYMENT_INSTANCE").unwrap();
457    let cluster_name = format!("hydro-{}", deployment_instance);
458
459    let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
460    let ecs_client = aws_sdk_ecs::Client::new(&config);
461
462    loop {
463        let tasks = match ecs_client
464            .list_tasks()
465            .cluster(&cluster_name)
466            .family(container_name)
467            .send()
468            .await
469        {
470            Ok(t) => t,
471            Err(e) => {
472                trace!(name: "resolve_ip_list_error", %container_name, error = %e);
473                tokio::time::sleep(Duration::from_secs(1)).await;
474                continue;
475            }
476        };
477
478        let Some(task_arn) = tasks.task_arns().first() else {
479            trace!(name: "resolve_ip_no_task", %container_name);
480            tokio::time::sleep(Duration::from_secs(1)).await;
481            continue;
482        };
483
484        let task_details = match ecs_client
485            .describe_tasks()
486            .cluster(&cluster_name)
487            .tasks(task_arn)
488            .send()
489            .await
490        {
491            Ok(d) => d,
492            Err(e) => {
493                trace!(name: "resolve_ip_describe_error", %container_name, error = %e);
494                tokio::time::sleep(Duration::from_secs(1)).await;
495                continue;
496            }
497        };
498
499        if let Some(task) = task_details.tasks().first() {
500            // Get private IP from task's network attachment
501            if let Some(ip) = task
502                .attachments()
503                .iter()
504                .flat_map(|a| a.details())
505                .find(|d| d.name() == Some("privateIPv4Address"))
506                .and_then(|d| d.value())
507            {
508                trace!(name: "resolved_ip", %container_name, %ip);
509                return ip.to_string();
510            }
511        }
512
513        trace!(name: "resolve_ip_no_ip", %container_name);
514        tokio::time::sleep(Duration::from_secs(1)).await;
515    }
516}