1use std::cell::UnsafeCell;
2use std::collections::{BTreeMap, HashMap};
3use std::io::Error;
4use std::marker::PhantomData;
5use std::pin::Pin;
6
7use bytes::{Bytes, BytesMut};
8use futures::{Sink, Stream};
9use proc_macro2::Span;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, Process};
26use crate::staging_util::Invariant;
27
28pub struct DeployFlow<'a, D>
29where
30 D: Deploy<'a>,
31{
32 pub(super) ir: UnsafeCell<Vec<HydroRoot>>,
36
37 pub(super) processes: HashMap<usize, D::Process>,
39
40 pub(super) process_id_name: Vec<(usize, String)>,
43
44 pub(super) externals: HashMap<usize, D::External>,
45 pub(super) external_id_name: Vec<(usize, String)>,
46
47 pub(super) clusters: HashMap<usize, D::Cluster>,
48 pub(super) cluster_id_name: Vec<(usize, String)>,
49
50 pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54 pub fn ir(&self) -> &Vec<HydroRoot> {
55 unsafe {
56 &*self.ir.get()
58 }
59 }
60
61 pub fn with_process_id_name(
62 mut self,
63 process_id: usize,
64 process_name: String,
65 spec: impl IntoProcessSpec<'a, D>,
66 ) -> Self {
67 self.processes.insert(
68 process_id,
69 spec.into_process_spec().build(process_id, &process_name),
70 );
71 self
72 }
73
74 pub fn with_process<P>(self, process: &Process<P>, spec: impl IntoProcessSpec<'a, D>) -> Self {
75 self.with_process_id_name(process.id, std::any::type_name::<P>().to_string(), spec)
76 }
77
78 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
79 mut self,
80 spec: impl Fn() -> S,
81 ) -> Self {
82 for (id, name) in &self.process_id_name {
83 self.processes
84 .insert(*id, spec().into_process_spec().build(*id, name));
85 }
86
87 self
88 }
89
90 pub fn with_external<P>(
91 mut self,
92 process: &External<P>,
93 spec: impl ExternalSpec<'a, D>,
94 ) -> Self {
95 let tag_name = std::any::type_name::<P>().to_string();
96 self.externals
97 .insert(process.id, spec.build(process.id, &tag_name));
98 self
99 }
100
101 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
102 mut self,
103 spec: impl Fn() -> S,
104 ) -> Self {
105 for (id, name) in &self.external_id_name {
106 self.externals.insert(*id, spec().build(*id, name));
107 }
108
109 self
110 }
111
112 pub fn with_cluster_id_name(
113 mut self,
114 cluster_id: usize,
115 cluster_name: String,
116 spec: impl ClusterSpec<'a, D>,
117 ) -> Self {
118 self.clusters
119 .insert(cluster_id, spec.build(cluster_id, &cluster_name));
120 self
121 }
122
123 pub fn with_cluster<C>(self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
124 self.with_cluster_id_name(cluster.id, std::any::type_name::<C>().to_string(), spec)
125 }
126
127 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
128 mut self,
129 spec: impl Fn() -> S,
130 ) -> Self {
131 for (id, name) in &self.cluster_id_name {
132 self.clusters.insert(*id, spec().build(*id, name));
133 }
134
135 self
136 }
137
138 pub fn preview_compile(&self) -> CompiledFlow<'a, ()> {
141 CompiledFlow {
142 dfir: build_inner(unsafe {
143 &mut *self.ir.get()
146 }),
147 _phantom: PhantomData,
148 }
149 }
150
151 pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> {
152 CompiledFlow {
153 dfir: build_inner(self.ir.get_mut()),
154 _phantom: PhantomData,
155 }
156 }
157}
158
159impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
160 pub fn compile(mut self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
161 let mut seen_tees: HashMap<_, _> = HashMap::new();
162 let mut extra_stmts = BTreeMap::new();
163 self.ir.get_mut().iter_mut().for_each(|leaf| {
164 leaf.compile_network::<D>(
165 env,
166 &mut extra_stmts,
167 &mut seen_tees,
168 &self.processes,
169 &self.clusters,
170 &self.externals,
171 );
172 });
173
174 CompiledFlow {
175 dfir: build_inner(self.ir.get_mut()),
176 _phantom: PhantomData,
177 }
178 }
179
180 fn cluster_id_stmts(
181 &self,
182 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
183 env: &<D as Deploy<'a>>::CompileEnv,
184 ) {
185 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
186 all_clusters_sorted.sort();
187
188 for &c_id in all_clusters_sorted {
189 let self_id_ident = syn::Ident::new(
190 &format!("__hydro_lang_cluster_self_id_{}", c_id),
191 Span::call_site(),
192 );
193 let self_id_expr = D::cluster_self_id(env).splice_untyped();
194 extra_stmts
195 .entry(c_id)
196 .or_default()
197 .push(syn::parse_quote! {
198 let #self_id_ident = #self_id_expr;
199 });
200
201 for other_location in self.processes.keys().chain(self.clusters.keys()) {
202 let other_id_ident = syn::Ident::new(
203 &format!("__hydro_lang_cluster_ids_{}", c_id),
204 Span::call_site(),
205 );
206 let other_id_expr = D::cluster_ids(env, c_id).splice_untyped();
207 extra_stmts
208 .entry(*other_location)
209 .or_default()
210 .push(syn::parse_quote! {
211 let #other_id_ident = #other_id_expr;
212 });
213 }
214 }
215 }
216}
217
218impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
219 #[must_use]
220 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
221 let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
222 let mut extra_stmts = BTreeMap::new();
223 self.ir.get_mut().iter_mut().for_each(|leaf| {
224 leaf.compile_network::<D>(
225 &(),
226 &mut extra_stmts,
227 &mut seen_tees_instantiate,
228 &self.processes,
229 &self.clusters,
230 &self.externals,
231 );
232 });
233
234 let mut compiled = build_inner(self.ir.get_mut());
235 self.cluster_id_stmts(&mut extra_stmts, &());
236 let mut meta = D::Meta::default();
237
238 let (mut processes, mut clusters, mut externals) = (
239 std::mem::take(&mut self.processes)
240 .into_iter()
241 .filter_map(|(node_id, node)| {
242 if let Some(ir) = compiled.remove(&node_id) {
243 node.instantiate(
244 env,
245 &mut meta,
246 ir,
247 extra_stmts.remove(&node_id).unwrap_or_default(),
248 );
249 Some((node_id, node))
250 } else {
251 None
252 }
253 })
254 .collect::<HashMap<_, _>>(),
255 std::mem::take(&mut self.clusters)
256 .into_iter()
257 .filter_map(|(cluster_id, cluster)| {
258 if let Some(ir) = compiled.remove(&cluster_id) {
259 cluster.instantiate(
260 env,
261 &mut meta,
262 ir,
263 extra_stmts.remove(&cluster_id).unwrap_or_default(),
264 );
265 Some((cluster_id, cluster))
266 } else {
267 None
268 }
269 })
270 .collect::<HashMap<_, _>>(),
271 std::mem::take(&mut self.externals)
272 .into_iter()
273 .map(|(external_id, external)| {
274 external.instantiate(
275 env,
276 &mut meta,
277 Default::default(),
278 extra_stmts.remove(&external_id).unwrap_or_default(),
279 );
280 (external_id, external)
281 })
282 .collect::<HashMap<_, _>>(),
283 );
284
285 for node in processes.values_mut() {
286 node.update_meta(&meta);
287 }
288
289 for cluster in clusters.values_mut() {
290 cluster.update_meta(&meta);
291 }
292
293 for external in externals.values_mut() {
294 external.update_meta(&meta);
295 }
296
297 let mut seen_tees_connect = HashMap::new();
298 self.ir.get_mut().iter_mut().for_each(|leaf| {
299 leaf.connect_network(&mut seen_tees_connect);
300 });
301
302 DeployResult {
303 processes,
304 clusters,
305 externals,
306 cluster_id_name: std::mem::take(&mut self.cluster_id_name)
307 .into_iter()
308 .collect(),
309 process_id_name: std::mem::take(&mut self.process_id_name)
310 .into_iter()
311 .collect(),
312 }
313 }
314}
315
316pub struct DeployResult<'a, D: Deploy<'a>> {
317 processes: HashMap<usize, D::Process>,
318 clusters: HashMap<usize, D::Cluster>,
319 externals: HashMap<usize, D::External>,
320 cluster_id_name: HashMap<usize, String>,
321 process_id_name: HashMap<usize, String>,
322}
323
324impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
325 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
326 let id = match p.id() {
327 LocationId::Process(id) => id,
328 _ => panic!("Process ID expected"),
329 };
330
331 self.processes.get(&id).unwrap()
332 }
333
334 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
335 let id = match c.id() {
336 LocationId::Cluster(id) => id,
337 _ => panic!("Cluster ID expected"),
338 };
339
340 self.clusters.get(&id).unwrap()
341 }
342
343 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, String, &D::Cluster)> {
344 self.clusters.iter().map(|(&id, c)| {
345 (
346 LocationId::Cluster(id),
347 self.cluster_id_name.get(&id).unwrap().clone(),
348 c,
349 )
350 })
351 }
352
353 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, String, &D::Process)> {
354 self.processes.iter().map(|(&id, p)| {
355 (
356 LocationId::Process(id),
357 self.process_id_name.get(&id).unwrap().clone(),
358 p,
359 )
360 })
361 }
362
363 pub fn get_external<P>(&self, p: &External<P>) -> &D::External {
364 self.externals.get(&p.id).unwrap()
365 }
366
367 pub fn raw_port<M>(&self, port: ExternalBytesPort<M>) -> D::ExternalRawPort {
368 self.externals
369 .get(&port.process_id)
370 .unwrap()
371 .raw_port(port.port_id)
372 }
373
374 #[deprecated(note = "use `connect` instead")]
375 pub async fn connect_bytes<M>(
376 &self,
377 port: ExternalBytesPort<M>,
378 ) -> (
379 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
380 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
381 ) {
382 self.connect(port).await
383 }
384
385 #[deprecated(note = "use `connect` instead")]
386 pub async fn connect_sink_bytes<M>(
387 &self,
388 port: ExternalBytesPort<M>,
389 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
390 self.connect(port).await.1
391 }
392
393 pub async fn connect_bincode<
394 InT: Serialize + 'static,
395 OutT: DeserializeOwned + 'static,
396 Many,
397 >(
398 &self,
399 port: ExternalBincodeBidi<InT, OutT, Many>,
400 ) -> (
401 Pin<Box<dyn Stream<Item = OutT>>>,
402 Pin<Box<dyn Sink<InT, Error = Error>>>,
403 ) {
404 self.externals
405 .get(&port.process_id)
406 .unwrap()
407 .as_bincode_bidi(port.port_id)
408 .await
409 }
410
411 #[deprecated(note = "use `connect` instead")]
412 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
413 &self,
414 port: ExternalBincodeSink<T, Many>,
415 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
416 self.connect(port).await
417 }
418
419 #[deprecated(note = "use `connect` instead")]
420 pub async fn connect_source_bytes(
421 &self,
422 port: ExternalBytesPort,
423 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
424 self.connect(port).await.0
425 }
426
427 #[deprecated(note = "use `connect` instead")]
428 pub async fn connect_source_bincode<
429 T: Serialize + DeserializeOwned + 'static,
430 O: Ordering,
431 R: Retries,
432 >(
433 &self,
434 port: ExternalBincodeStream<T, O, R>,
435 ) -> Pin<Box<dyn Stream<Item = T>>> {
436 self.connect(port).await
437 }
438
439 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
440 &'b self,
441 port: P,
442 ) -> <P as ConnectableAsync<&'b Self>>::Output {
443 port.connect(self).await
444 }
445}
446
447pub trait ConnectableAsync<Ctx> {
448 type Output;
449
450 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
451}
452
453impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
454 type Output = (
455 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
456 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
457 );
458
459 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
460 ctx.externals
461 .get(&self.process_id)
462 .unwrap()
463 .as_bytes_bidi(self.port_id)
464 .await
465 }
466}
467
468impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
469 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
470{
471 type Output = Pin<Box<dyn Stream<Item = T>>>;
472
473 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
474 ctx.externals
475 .get(&self.process_id)
476 .unwrap()
477 .as_bincode_source(self.port_id)
478 .await
479 }
480}
481
482impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
483 for ExternalBincodeSink<T, Many>
484{
485 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
486
487 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
488 ctx.externals
489 .get(&self.process_id)
490 .unwrap()
491 .as_bincode_sink(self.port_id)
492 .await
493 }
494}