hydro_lang/deploy/
deploy_runtime_containerized.rs

1#![allow(
2    unused,
3    reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::demux_map_lazy::LazyDemuxSink;
20use sinktools::lazy::{LazySink, LazySource};
21use sinktools::lazy_sink_source::LazySinkSource;
22use stageleft::runtime_support::{
23    FreeVariableWithContext, FreeVariableWithContextWithProps, QuoteTokens,
24};
25use stageleft::{QuotedWithContext, q};
26use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
27use tokio::net::{TcpListener, TcpStream};
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{debug, instrument};
30
31use crate::location::dynamic::LocationId;
32use crate::location::member_id::TaglessMemberId;
33use crate::location::{MemberId, MembershipEvent};
34
35pub fn deploy_containerized_o2o(target: &str, bind_addr: &str) -> (syn::Expr, syn::Expr) {
36    (
37        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
38            async move {
39                let target = target;
40                debug!(name: "connecting", %target);
41                Result::<_, std::io::Error>::Ok(FramedWrite::new(
42                    TcpStream::connect(target).await?,
43                    LengthDelimitedCodec::new(),
44                ))
45            }
46        )))
47        .splice_untyped_ctx(&()),
48        q!(LazySource::new(move || Box::pin(async move {
49            let listener = TcpListener::bind(bind_addr).await?;
50            let (stream, peer) = listener.accept().await?;
51            debug!(name: "accepting", ?peer);
52            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
53        })))
54        .splice_untyped_ctx(&()),
55    )
56}
57
58pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
59    (
60        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
61            q!(sinktools::demux_map_lazy::<_, _, _, _>(
62                move |key: &TaglessMemberId| {
63                    let key = key.clone();
64
65                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
66                        Box::pin(async move {
67                            let port = port;
68                            debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
69                            let mut sink = FramedWrite::new(
70                                TcpStream::connect(format!(
71                                    "{}:{}",
72                                    key.get_container_name(),
73                                    port
74                                ))
75                                .await?,
76                                LengthDelimitedCodec::new(),
77                            );
78
79                            Result::<_, std::io::Error>::Ok(sink)
80                        })
81                    })
82                }
83            )),
84            &(),
85        ),
86        q!(LazySource::new(move || Box::pin(async move {
87            let bind_addr = format!("0.0.0.0:{}", port);
88            debug!(name: "listening", %bind_addr);
89            let listener = TcpListener::bind(bind_addr).await?;
90            let (stream, peer) = listener.accept().await?;
91            debug!(name: "accepting", ?peer);
92
93            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
94        })))
95        .splice_untyped_ctx(&()),
96    )
97}
98
99pub fn deploy_containerized_m2o(port: u16, target_host: &str) -> (syn::Expr, syn::Expr) {
100    (
101        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
102            Box::pin(async move {
103                let target = format!("{}:{}", target_host, port);
104                debug!(name: "connecting", %target);
105
106                let mut sink = FramedWrite::new(
107                    TcpStream::connect(target).await?,
108                    LengthDelimitedCodec::new(),
109                );
110
111                sink.send(bytes::Bytes::from(
112                    bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
113                        .unwrap(),
114                ))
115                .await?;
116
117                Result::<_, std::io::Error>::Ok(sink)
118            })
119        }))
120        .splice_untyped_ctx(&()),
121        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
122            q!(LazySource::new(move || Box::pin(async move {
123                let bind_addr = format!("0.0.0.0:{}", port);
124                debug!(name: "listening", %bind_addr);
125                let listener = TcpListener::bind(bind_addr).await?;
126                Result::<_, std::io::Error>::Ok(
127                    futures::stream::unfold(listener, |listener| {
128                        Box::pin(async move {
129                            let (stream, peer) = listener.accept().await.ok()?;
130                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
131                            let from =
132                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
133                                    .ok()?;
134
135                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
136
137                            Some((
138                                source.map(move |v| {
139                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
140                                }),
141                                listener,
142                            ))
143                        })
144                    })
145                    .flatten_unordered(None),
146                )
147            }))),
148            &(),
149        ),
150    )
151}
152
153pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
154    (
155        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
156            q!(sinktools::demux_map_lazy::<_, _, _, _>(
157                move |key: &TaglessMemberId| {
158                    let key = key.clone();
159
160                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
161                        Box::pin(async move {
162                            let port = port;
163                            debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
164                            let mut sink = FramedWrite::new(
165                                TcpStream::connect(format!(
166                                    "{}:{}",
167                                    key.get_container_name(),
168                                    port
169                                ))
170                                .await?,
171                                LengthDelimitedCodec::new(),
172                            );
173                            debug!(name: "connected", target = format!("{}:{}", key.get_container_name(), port));
174
175                            sink.send(bytes::Bytes::from(
176                                bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
177                                    .unwrap(),
178                            ))
179                            .await?;
180
181                            Result::<_, std::io::Error>::Ok(sink)
182                        })
183                    })
184                }
185            )),
186            &(),
187        ),
188        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
189            q!(LazySource::new(move || Box::pin(async move {
190                let bind_addr = format!("0.0.0.0:{}", port);
191                debug!(name: "listening", %bind_addr);
192                let listener = TcpListener::bind(bind_addr).await?;
193
194                Result::<_, std::io::Error>::Ok(
195                    futures::stream::unfold(listener, |listener| {
196                        Box::pin(async move {
197                            let (stream, peer) = listener.accept().await.ok()?;
198                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
199                            let from =
200                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
201                                    .ok()?;
202
203                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
204
205                            Some((
206                                source.map(move |v| {
207                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
208                                }),
209                                listener,
210                            ))
211                        })
212                    })
213                    .flatten_unordered(None),
214                )
215            }))),
216            &(),
217        ),
218    )
219}
220
221pub struct SocketIdent {
222    pub socket_ident: syn::Ident,
223}
224
225impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for SocketIdent {
226    type O = TcpListener;
227
228    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
229    where
230        Self: Sized,
231    {
232        let ident = self.socket_ident;
233
234        (
235            QuoteTokens {
236                prelude: None,
237                expr: Some(quote::quote! { #ident }),
238            },
239            (),
240        )
241    }
242}
243
244pub fn deploy_containerized_external_sink_source_ident(socket_ident: syn::Ident) -> syn::Expr {
245    let socket_ident = SocketIdent { socket_ident };
246
247    q!(LazySinkSource::<
248        _,
249        FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
250        FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
251        bytes::Bytes,
252        std::io::Error,
253    >::new(async move {
254        let (stream, peer) = socket_ident.accept().await?;
255        debug!(name: "external accepting", ?peer);
256        let (rx, tx) = stream.into_split();
257
258        let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
259        let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
260
261        Result::<_, std::io::Error>::Ok((fr, fw))
262    },))
263    .splice_untyped_ctx(&())
264}
265
266pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
267    // unimplemented!(); // this is unused.
268
269    // This is a dummy piece of code, since clusters are dynamic when containerized.
270    q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
271        "INVALID CONTAINER NAME cluster_ids"
272    )]))
273    .as_slice())
274}
275
276pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
277    q!(TaglessMemberId::from_container_name(
278        std::env::var("CONTAINER_NAME").unwrap()
279    ))
280}
281
282pub fn cluster_membership_stream<'a>(
283    location_id: &LocationId,
284) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
285{
286    let raw_id = location_id.raw_id();
287
288    q!(Box::new(self::docker_membership_stream(
289        std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
290        raw_id
291    ))
292        as Box<
293            dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
294        >)
295}
296
297// There's a risk of race conditions here since all the containers will be starting up at the same time.
298// So we need to start listening for events and the take a snapshot of currently running containers, since they may have already started up before we started listening to events.
299// Then we need to turn that into a usable stream for the consumer in this current hydro program. The way you do that is by emitting from the snapshot first, and then start emitting from the stream. Keep a hash set around to track whether a container is up or down.
300#[instrument(skip_all, fields(%deployment_instance, %location_id))]
301fn docker_membership_stream(
302    deployment_instance: String,
303    location_id: usize,
304) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
305    use std::collections::HashSet;
306    use std::sync::{Arc, Mutex};
307
308    use bollard::Docker;
309    use bollard::query_parameters::{EventsOptions, ListContainersOptions};
310    use tokio::sync::mpsc;
311
312    let docker = Docker::connect_with_local_defaults()
313        .unwrap()
314        .with_timeout(Duration::from_secs(1));
315
316    let (event_tx, event_rx) = mpsc::unbounded_channel::<(String, MembershipEvent)>();
317
318    // 1. Start event subscription in a spawned task
319    let events_docker = docker.clone();
320    let events_deployment_instance = deployment_instance.clone();
321    tokio::spawn(async move {
322        let mut filters = HashMap::new();
323        filters.insert("type".to_string(), vec!["container".to_string()]);
324        filters.insert(
325            "event".to_string(),
326            vec!["start".to_string(), "die".to_string()],
327        );
328        let event_options = Some(EventsOptions {
329            filters: Some(filters),
330            ..Default::default()
331        });
332
333        let mut events = events_docker.events(event_options);
334        while let Some(event) = events.next().await {
335            if let Some((name, membership_event)) = event.ok().and_then(|e| {
336                let name = e
337                    .actor
338                    .and_then(|a| a.attributes.and_then(|attrs| attrs.get("name").cloned()))?;
339
340                if name.contains(format!("{events_deployment_instance}-{location_id}").as_str()) {
341                    match e.action.as_deref() {
342                        Some("start") => Some((name.clone(), MembershipEvent::Joined)),
343                        Some("die") => Some((name, MembershipEvent::Left)),
344                        _ => None,
345                    }
346                } else {
347                    None
348                }
349            }) && event_tx.send((name, membership_event)).is_err()
350            {
351                break;
352            }
353        }
354    });
355
356    // Shared state for deduplication across snapshot and events phases
357    let seen_joined = Arc::new(Mutex::new(HashSet::<String>::new()));
358    let seen_joined_snapshot = seen_joined.clone();
359    let seen_joined_events = seen_joined;
360
361    // 2. Snapshot stream - fetch current containers and emit Joined events
362    let snapshot_stream = futures::stream::once(async move {
363        let mut filters = HashMap::new();
364        filters.insert(
365            "name".to_string(),
366            vec![format!("{deployment_instance}-{location_id}")],
367        );
368        let options = Some(ListContainersOptions {
369            filters: Some(filters),
370            ..Default::default()
371        });
372
373        docker
374            .list_containers(options)
375            .await
376            .unwrap_or_default()
377            .into_iter()
378            .filter_map(|c| {
379                c.names
380                    .and_then(|names| names.first().map(|n| n.trim_start_matches('/').to_string()))
381            })
382            .filter_map(|name| {
383                if seen_joined_snapshot.lock().unwrap().insert(name.clone()) {
384                    Some((name, MembershipEvent::Joined))
385                } else {
386                    None
387                }
388            })
389            .collect::<Vec<_>>()
390    })
391    .flat_map(futures::stream::iter);
392
393    // 3. Events stream - process live events with deduplication
394    let events_stream = tokio_stream::StreamExt::filter_map(
395        tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx),
396        move |(name, event)| {
397            let mut seen = seen_joined_events.lock().unwrap();
398            match event {
399                MembershipEvent::Joined => {
400                    if seen.insert(name.clone()) {
401                        Some((name, MembershipEvent::Joined))
402                    } else {
403                        None
404                    }
405                }
406                MembershipEvent::Left => {
407                    if seen.remove(&name) {
408                        Some((name, MembershipEvent::Left))
409                    } else {
410                        None
411                    }
412                }
413            }
414        },
415    );
416
417    // 4. Chain snapshot then events
418    Box::pin(
419        snapshot_stream
420            .chain(events_stream)
421            .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
422            .inspect(|(member_id, event)| debug!(name: "membership_event", ?member_id, ?event)),
423    )
424}