1#![allow(
2 unused,
3 reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::demux_map_lazy::LazyDemuxSink;
20use sinktools::lazy::{LazySink, LazySource};
21use sinktools::lazy_sink_source::LazySinkSource;
22use stageleft::runtime_support::{
23 FreeVariableWithContext, FreeVariableWithContextWithProps, QuoteTokens,
24};
25use stageleft::{QuotedWithContext, q};
26use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
27use tokio::net::{TcpListener, TcpStream};
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{debug, instrument};
30
31use crate::location::dynamic::LocationId;
32use crate::location::member_id::TaglessMemberId;
33use crate::location::{MemberId, MembershipEvent};
34
35pub fn deploy_containerized_o2o(target: &str, bind_addr: &str) -> (syn::Expr, syn::Expr) {
36 (
37 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
38 async move {
39 let target = target;
40 debug!(name: "connecting", %target);
41 Result::<_, std::io::Error>::Ok(FramedWrite::new(
42 TcpStream::connect(target).await?,
43 LengthDelimitedCodec::new(),
44 ))
45 }
46 )))
47 .splice_untyped_ctx(&()),
48 q!(LazySource::new(move || Box::pin(async move {
49 let listener = TcpListener::bind(bind_addr).await?;
50 let (stream, peer) = listener.accept().await?;
51 debug!(name: "accepting", ?peer);
52 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
53 })))
54 .splice_untyped_ctx(&()),
55 )
56}
57
58pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
59 (
60 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
61 q!(sinktools::demux_map_lazy::<_, _, _, _>(
62 move |key: &TaglessMemberId| {
63 let key = key.clone();
64
65 LazySink::<_, _, _, bytes::Bytes>::new(move || {
66 Box::pin(async move {
67 let port = port;
68 debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
69 let mut sink = FramedWrite::new(
70 TcpStream::connect(format!(
71 "{}:{}",
72 key.get_container_name(),
73 port
74 ))
75 .await?,
76 LengthDelimitedCodec::new(),
77 );
78
79 Result::<_, std::io::Error>::Ok(sink)
80 })
81 })
82 }
83 )),
84 &(),
85 ),
86 q!(LazySource::new(move || Box::pin(async move {
87 let bind_addr = format!("0.0.0.0:{}", port);
88 debug!(name: "listening", %bind_addr);
89 let listener = TcpListener::bind(bind_addr).await?;
90 let (stream, peer) = listener.accept().await?;
91 debug!(name: "accepting", ?peer);
92
93 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
94 })))
95 .splice_untyped_ctx(&()),
96 )
97}
98
99pub fn deploy_containerized_m2o(port: u16, target_host: &str) -> (syn::Expr, syn::Expr) {
100 (
101 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
102 Box::pin(async move {
103 let target = format!("{}:{}", target_host, port);
104 debug!(name: "connecting", %target);
105
106 let mut sink = FramedWrite::new(
107 TcpStream::connect(target).await?,
108 LengthDelimitedCodec::new(),
109 );
110
111 sink.send(bytes::Bytes::from(
112 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
113 .unwrap(),
114 ))
115 .await?;
116
117 Result::<_, std::io::Error>::Ok(sink)
118 })
119 }))
120 .splice_untyped_ctx(&()),
121 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
122 q!(LazySource::new(move || Box::pin(async move {
123 let bind_addr = format!("0.0.0.0:{}", port);
124 debug!(name: "listening", %bind_addr);
125 let listener = TcpListener::bind(bind_addr).await?;
126 Result::<_, std::io::Error>::Ok(
127 futures::stream::unfold(listener, |listener| {
128 Box::pin(async move {
129 let (stream, peer) = listener.accept().await.ok()?;
130 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
131 let from =
132 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
133 .ok()?;
134
135 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
136
137 Some((
138 source.map(move |v| {
139 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
140 }),
141 listener,
142 ))
143 })
144 })
145 .flatten_unordered(None),
146 )
147 }))),
148 &(),
149 ),
150 )
151}
152
153pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
154 (
155 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
156 q!(sinktools::demux_map_lazy::<_, _, _, _>(
157 move |key: &TaglessMemberId| {
158 let key = key.clone();
159
160 LazySink::<_, _, _, bytes::Bytes>::new(move || {
161 Box::pin(async move {
162 let port = port;
163 debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
164 let mut sink = FramedWrite::new(
165 TcpStream::connect(format!(
166 "{}:{}",
167 key.get_container_name(),
168 port
169 ))
170 .await?,
171 LengthDelimitedCodec::new(),
172 );
173 debug!(name: "connected", target = format!("{}:{}", key.get_container_name(), port));
174
175 sink.send(bytes::Bytes::from(
176 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
177 .unwrap(),
178 ))
179 .await?;
180
181 Result::<_, std::io::Error>::Ok(sink)
182 })
183 })
184 }
185 )),
186 &(),
187 ),
188 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
189 q!(LazySource::new(move || Box::pin(async move {
190 let bind_addr = format!("0.0.0.0:{}", port);
191 debug!(name: "listening", %bind_addr);
192 let listener = TcpListener::bind(bind_addr).await?;
193
194 Result::<_, std::io::Error>::Ok(
195 futures::stream::unfold(listener, |listener| {
196 Box::pin(async move {
197 let (stream, peer) = listener.accept().await.ok()?;
198 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
199 let from =
200 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
201 .ok()?;
202
203 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
204
205 Some((
206 source.map(move |v| {
207 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
208 }),
209 listener,
210 ))
211 })
212 })
213 .flatten_unordered(None),
214 )
215 }))),
216 &(),
217 ),
218 )
219}
220
221pub struct SocketIdent {
222 pub socket_ident: syn::Ident,
223}
224
225impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for SocketIdent {
226 type O = TcpListener;
227
228 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
229 where
230 Self: Sized,
231 {
232 let ident = self.socket_ident;
233
234 (
235 QuoteTokens {
236 prelude: None,
237 expr: Some(quote::quote! { #ident }),
238 },
239 (),
240 )
241 }
242}
243
244pub fn deploy_containerized_external_sink_source_ident(socket_ident: syn::Ident) -> syn::Expr {
245 let socket_ident = SocketIdent { socket_ident };
246
247 q!(LazySinkSource::<
248 _,
249 FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
250 FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
251 bytes::Bytes,
252 std::io::Error,
253 >::new(async move {
254 let (stream, peer) = socket_ident.accept().await?;
255 debug!(name: "external accepting", ?peer);
256 let (rx, tx) = stream.into_split();
257
258 let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
259 let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
260
261 Result::<_, std::io::Error>::Ok((fr, fw))
262 },))
263 .splice_untyped_ctx(&())
264}
265
266pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
267 q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
271 "INVALID CONTAINER NAME cluster_ids"
272 )]))
273 .as_slice())
274}
275
276pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
277 q!(TaglessMemberId::from_container_name(
278 std::env::var("CONTAINER_NAME").unwrap()
279 ))
280}
281
282pub fn cluster_membership_stream<'a>(
283 location_id: &LocationId,
284) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
285{
286 let raw_id = location_id.raw_id();
287
288 q!(Box::new(self::docker_membership_stream(
289 std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
290 raw_id
291 ))
292 as Box<
293 dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
294 >)
295}
296
297#[instrument(skip_all, fields(%deployment_instance, %location_id))]
301fn docker_membership_stream(
302 deployment_instance: String,
303 location_id: usize,
304) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
305 use std::collections::HashSet;
306 use std::sync::{Arc, Mutex};
307
308 use bollard::Docker;
309 use bollard::query_parameters::{EventsOptions, ListContainersOptions};
310 use tokio::sync::mpsc;
311
312 let docker = Docker::connect_with_local_defaults()
313 .unwrap()
314 .with_timeout(Duration::from_secs(1));
315
316 let (event_tx, event_rx) = mpsc::unbounded_channel::<(String, MembershipEvent)>();
317
318 let events_docker = docker.clone();
320 let events_deployment_instance = deployment_instance.clone();
321 tokio::spawn(async move {
322 let mut filters = HashMap::new();
323 filters.insert("type".to_string(), vec!["container".to_string()]);
324 filters.insert(
325 "event".to_string(),
326 vec!["start".to_string(), "die".to_string()],
327 );
328 let event_options = Some(EventsOptions {
329 filters: Some(filters),
330 ..Default::default()
331 });
332
333 let mut events = events_docker.events(event_options);
334 while let Some(event) = events.next().await {
335 if let Some((name, membership_event)) = event.ok().and_then(|e| {
336 let name = e
337 .actor
338 .and_then(|a| a.attributes.and_then(|attrs| attrs.get("name").cloned()))?;
339
340 if name.contains(format!("{events_deployment_instance}-{location_id}").as_str()) {
341 match e.action.as_deref() {
342 Some("start") => Some((name.clone(), MembershipEvent::Joined)),
343 Some("die") => Some((name, MembershipEvent::Left)),
344 _ => None,
345 }
346 } else {
347 None
348 }
349 }) && event_tx.send((name, membership_event)).is_err()
350 {
351 break;
352 }
353 }
354 });
355
356 let seen_joined = Arc::new(Mutex::new(HashSet::<String>::new()));
358 let seen_joined_snapshot = seen_joined.clone();
359 let seen_joined_events = seen_joined;
360
361 let snapshot_stream = futures::stream::once(async move {
363 let mut filters = HashMap::new();
364 filters.insert(
365 "name".to_string(),
366 vec![format!("{deployment_instance}-{location_id}")],
367 );
368 let options = Some(ListContainersOptions {
369 filters: Some(filters),
370 ..Default::default()
371 });
372
373 docker
374 .list_containers(options)
375 .await
376 .unwrap_or_default()
377 .into_iter()
378 .filter_map(|c| {
379 c.names
380 .and_then(|names| names.first().map(|n| n.trim_start_matches('/').to_string()))
381 })
382 .filter_map(|name| {
383 if seen_joined_snapshot.lock().unwrap().insert(name.clone()) {
384 Some((name, MembershipEvent::Joined))
385 } else {
386 None
387 }
388 })
389 .collect::<Vec<_>>()
390 })
391 .flat_map(futures::stream::iter);
392
393 let events_stream = tokio_stream::StreamExt::filter_map(
395 tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx),
396 move |(name, event)| {
397 let mut seen = seen_joined_events.lock().unwrap();
398 match event {
399 MembershipEvent::Joined => {
400 if seen.insert(name.clone()) {
401 Some((name, MembershipEvent::Joined))
402 } else {
403 None
404 }
405 }
406 MembershipEvent::Left => {
407 if seen.remove(&name) {
408 Some((name, MembershipEvent::Left))
409 } else {
410 None
411 }
412 }
413 }
414 },
415 );
416
417 Box::pin(
419 snapshot_stream
420 .chain(events_stream)
421 .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
422 .inspect(|(member_id, event)| debug!(name: "membership_event", ?member_id, ?event)),
423 )
424}