1use std::collections::{BTreeMap, HashMap};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use stageleft::QuotedWithContext;
12
13use super::built::build_inner;
14use super::compiled::CompiledFlow;
15use super::deploy_provider::{
16 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
17};
18use super::ir::HydroRoot;
19use crate::live_collections::stream::{Ordering, Retries};
20use crate::location::dynamic::LocationId;
21use crate::location::external_process::{
22 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
23};
24use crate::location::{Cluster, External, Location, Process};
25use crate::staging_util::Invariant;
26
27pub struct DeployFlow<'a, D>
28where
29 D: Deploy<'a>,
30{
31 pub(super) ir: Vec<HydroRoot>,
32
33 pub(super) processes: HashMap<usize, D::Process>,
35
36 pub(super) process_id_name: Vec<(usize, String)>,
39
40 pub(super) externals: HashMap<usize, D::External>,
41 pub(super) external_id_name: Vec<(usize, String)>,
42
43 pub(super) clusters: HashMap<usize, D::Cluster>,
44 pub(super) cluster_id_name: Vec<(usize, String)>,
45
46 pub(super) _phantom: Invariant<'a, D>,
47}
48
49impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
50 pub fn ir(&self) -> &Vec<HydroRoot> {
51 &self.ir
52 }
53
54 pub fn with_process_id_name(
55 mut self,
56 process_id: usize,
57 process_name: String,
58 spec: impl IntoProcessSpec<'a, D>,
59 ) -> Self {
60 self.processes.insert(
61 process_id,
62 spec.into_process_spec().build(process_id, &process_name),
63 );
64 self
65 }
66
67 pub fn with_process<P>(self, process: &Process<P>, spec: impl IntoProcessSpec<'a, D>) -> Self {
68 self.with_process_id_name(process.id, std::any::type_name::<P>().to_string(), spec)
69 }
70
71 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
72 mut self,
73 spec: impl Fn() -> S,
74 ) -> Self {
75 for (id, name) in &self.process_id_name {
76 self.processes
77 .insert(*id, spec().into_process_spec().build(*id, name));
78 }
79
80 self
81 }
82
83 pub fn with_external<P>(
84 mut self,
85 process: &External<P>,
86 spec: impl ExternalSpec<'a, D>,
87 ) -> Self {
88 let tag_name = std::any::type_name::<P>().to_string();
89 self.externals
90 .insert(process.id, spec.build(process.id, &tag_name));
91 self
92 }
93
94 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
95 mut self,
96 spec: impl Fn() -> S,
97 ) -> Self {
98 for (id, name) in &self.external_id_name {
99 self.externals.insert(*id, spec().build(*id, name));
100 }
101
102 self
103 }
104
105 pub fn with_cluster_id_name(
106 mut self,
107 cluster_id: usize,
108 cluster_name: String,
109 spec: impl ClusterSpec<'a, D>,
110 ) -> Self {
111 self.clusters
112 .insert(cluster_id, spec.build(cluster_id, &cluster_name));
113 self
114 }
115
116 pub fn with_cluster<C>(self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
117 self.with_cluster_id_name(cluster.id, std::any::type_name::<C>().to_string(), spec)
118 }
119
120 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
121 mut self,
122 spec: impl Fn() -> S,
123 ) -> Self {
124 for (id, name) in &self.cluster_id_name {
125 self.clusters.insert(*id, spec().build(*id, name));
126 }
127
128 self
129 }
130
131 pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
136 CompiledFlow {
139 dfir: build_inner::<D>(&mut self.ir),
140 extra_stmts: BTreeMap::new(),
141 _phantom: PhantomData,
142 }
143 }
144}
145
146impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
147 pub fn compile(&mut self) -> CompiledFlow<'a> {
151 let mut seen_tees: HashMap<_, _> = HashMap::new();
152 let mut extra_stmts = BTreeMap::new();
153 self.ir.iter_mut().for_each(|leaf| {
154 leaf.compile_network::<D>(
155 &mut extra_stmts,
156 &mut seen_tees,
157 &self.processes,
158 &self.clusters,
159 &self.externals,
160 );
161 });
162
163 CompiledFlow {
164 dfir: build_inner::<D>(&mut self.ir),
165 extra_stmts,
166 _phantom: PhantomData,
167 }
168 }
169
170 fn cluster_id_stmts(&self, extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>) {
172 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
173 all_clusters_sorted.sort();
174
175 for &c_id in all_clusters_sorted {
176 let self_id_ident = syn::Ident::new(
177 &format!("__hydro_lang_cluster_self_id_{}", c_id),
178 Span::call_site(),
179 );
180 let self_id_expr = D::cluster_self_id().splice_untyped();
181 extra_stmts
182 .entry(c_id)
183 .or_default()
184 .push(syn::parse_quote! {
185 let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
186 });
187
188 for other_location in self.processes.keys().chain(self.clusters.keys()) {
189 let other_id_ident = syn::Ident::new(
190 &format!("__hydro_lang_cluster_ids_{}", c_id),
191 Span::call_site(),
192 );
193 let other_id_expr = D::cluster_ids(c_id).splice_untyped();
194 extra_stmts
195 .entry(*other_location)
196 .or_default()
197 .push(syn::parse_quote! {
198 let #other_id_ident = #other_id_expr;
199 });
200 }
201 }
202 }
203
204 #[must_use]
212 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
213 let CompiledFlow {
214 dfir,
215 mut extra_stmts,
216 _phantom,
217 } = self.compile();
218
219 let mut compiled = dfir;
220 self.cluster_id_stmts(&mut extra_stmts);
221 let mut meta = D::Meta::default();
222
223 let (mut processes, mut clusters, mut externals) = (
224 std::mem::take(&mut self.processes)
225 .into_iter()
226 .filter_map(|(node_id, node)| {
227 if let Some(ir) = compiled.remove(&node_id) {
228 node.instantiate(
229 env,
230 &mut meta,
231 ir,
232 extra_stmts.remove(&node_id).unwrap_or_default(),
233 );
234 Some((node_id, node))
235 } else {
236 None
237 }
238 })
239 .collect::<HashMap<_, _>>(),
240 std::mem::take(&mut self.clusters)
241 .into_iter()
242 .filter_map(|(cluster_id, cluster)| {
243 if let Some(ir) = compiled.remove(&cluster_id) {
244 cluster.instantiate(
245 env,
246 &mut meta,
247 ir,
248 extra_stmts.remove(&cluster_id).unwrap_or_default(),
249 );
250 Some((cluster_id, cluster))
251 } else {
252 None
253 }
254 })
255 .collect::<HashMap<_, _>>(),
256 std::mem::take(&mut self.externals)
257 .into_iter()
258 .map(|(external_id, external)| {
259 external.instantiate(
260 env,
261 &mut meta,
262 Default::default(),
263 extra_stmts.remove(&external_id).unwrap_or_default(),
264 );
265 (external_id, external)
266 })
267 .collect::<HashMap<_, _>>(),
268 );
269
270 for node in processes.values_mut() {
271 node.update_meta(&meta);
272 }
273
274 for cluster in clusters.values_mut() {
275 cluster.update_meta(&meta);
276 }
277
278 for external in externals.values_mut() {
279 external.update_meta(&meta);
280 }
281
282 let mut seen_tees_connect = HashMap::new();
283 self.ir.iter_mut().for_each(|leaf| {
284 leaf.connect_network(&mut seen_tees_connect);
285 });
286
287 DeployResult {
288 processes,
289 clusters,
290 externals,
291 cluster_id_name: std::mem::take(&mut self.cluster_id_name)
292 .into_iter()
293 .collect(),
294 process_id_name: std::mem::take(&mut self.process_id_name)
295 .into_iter()
296 .collect(),
297 }
298 }
299}
300
301pub struct DeployResult<'a, D: Deploy<'a>> {
302 processes: HashMap<usize, D::Process>,
303 clusters: HashMap<usize, D::Cluster>,
304 externals: HashMap<usize, D::External>,
305 cluster_id_name: HashMap<usize, String>,
306 process_id_name: HashMap<usize, String>,
307}
308
309impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
310 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
311 let id = match p.id() {
312 LocationId::Process(id) => id,
313 _ => panic!("Process ID expected"),
314 };
315
316 self.processes.get(&id).unwrap()
317 }
318
319 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
320 let id = match c.id() {
321 LocationId::Cluster(id) => id,
322 _ => panic!("Cluster ID expected"),
323 };
324
325 self.clusters.get(&id).unwrap()
326 }
327
328 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, String, &D::Cluster)> {
329 self.clusters.iter().map(|(&id, c)| {
330 (
331 LocationId::Cluster(id),
332 self.cluster_id_name.get(&id).unwrap().clone(),
333 c,
334 )
335 })
336 }
337
338 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, String, &D::Process)> {
339 self.processes.iter().map(|(&id, p)| {
340 (
341 LocationId::Process(id),
342 self.process_id_name.get(&id).unwrap().clone(),
343 p,
344 )
345 })
346 }
347
348 pub fn get_external<P>(&self, p: &External<P>) -> &D::External {
349 self.externals.get(&p.id).unwrap()
350 }
351
352 #[deprecated(note = "use `connect` instead")]
353 pub async fn connect_bytes<M>(
354 &self,
355 port: ExternalBytesPort<M>,
356 ) -> (
357 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
358 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
359 ) {
360 self.connect(port).await
361 }
362
363 #[deprecated(note = "use `connect` instead")]
364 pub async fn connect_sink_bytes<M>(
365 &self,
366 port: ExternalBytesPort<M>,
367 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
368 self.connect(port).await.1
369 }
370
371 pub async fn connect_bincode<
372 InT: Serialize + 'static,
373 OutT: DeserializeOwned + 'static,
374 Many,
375 >(
376 &self,
377 port: ExternalBincodeBidi<InT, OutT, Many>,
378 ) -> (
379 Pin<Box<dyn Stream<Item = OutT>>>,
380 Pin<Box<dyn Sink<InT, Error = Error>>>,
381 ) {
382 self.externals
383 .get(&port.process_id)
384 .unwrap()
385 .as_bincode_bidi(port.port_id)
386 .await
387 }
388
389 #[deprecated(note = "use `connect` instead")]
390 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
391 &self,
392 port: ExternalBincodeSink<T, Many>,
393 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
394 self.connect(port).await
395 }
396
397 #[deprecated(note = "use `connect` instead")]
398 pub async fn connect_source_bytes(
399 &self,
400 port: ExternalBytesPort,
401 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
402 self.connect(port).await.0
403 }
404
405 #[deprecated(note = "use `connect` instead")]
406 pub async fn connect_source_bincode<
407 T: Serialize + DeserializeOwned + 'static,
408 O: Ordering,
409 R: Retries,
410 >(
411 &self,
412 port: ExternalBincodeStream<T, O, R>,
413 ) -> Pin<Box<dyn Stream<Item = T>>> {
414 self.connect(port).await
415 }
416
417 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
418 &'b self,
419 port: P,
420 ) -> <P as ConnectableAsync<&'b Self>>::Output {
421 port.connect(self).await
422 }
423}
424
425#[cfg(stageleft_runtime)]
426#[cfg(feature = "deploy")]
427#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
428impl DeployResult<'_, crate::deploy::HydroDeploy> {
429 pub fn raw_port<M>(
431 &self,
432 port: ExternalBytesPort<M>,
433 ) -> hydro_deploy::custom_service::CustomClientPort {
434 self.externals
435 .get(&port.process_id)
436 .unwrap()
437 .raw_port(port.port_id)
438 }
439}
440
441pub trait ConnectableAsync<Ctx> {
442 type Output;
443
444 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
445}
446
447impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
448 type Output = (
449 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
450 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
451 );
452
453 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
454 ctx.externals
455 .get(&self.process_id)
456 .unwrap()
457 .as_bytes_bidi(self.port_id)
458 .await
459 }
460}
461
462impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
463 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
464{
465 type Output = Pin<Box<dyn Stream<Item = T>>>;
466
467 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
468 ctx.externals
469 .get(&self.process_id)
470 .unwrap()
471 .as_bincode_source(self.port_id)
472 .await
473 }
474}
475
476impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
477 for ExternalBincodeSink<T, Many>
478{
479 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
480
481 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
482 ctx.externals
483 .get(&self.process_id)
484 .unwrap()
485 .as_bincode_sink(self.port_id)
486 .await
487 }
488}