1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307}
308
309#[cfg(feature = "build")]
310pub enum BuildersOrCallback<'a, L, N>
311where
312 L: FnMut(&mut HydroRoot, &mut usize),
313 N: FnMut(&mut HydroNode, &mut usize),
314{
315 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
316 Callback(L, N),
317}
318
319#[derive(Debug, Hash)]
323pub enum HydroRoot {
324 ForEach {
325 f: DebugExpr,
326 input: Box<HydroNode>,
327 op_metadata: HydroIrOpMetadata,
328 },
329 SendExternal {
330 to_external_id: usize,
331 to_key: usize,
332 to_many: bool,
333 serialize_fn: Option<DebugExpr>,
334 instantiate_fn: DebugInstantiate,
335 input: Box<HydroNode>,
336 op_metadata: HydroIrOpMetadata,
337 },
338 DestSink {
339 sink: DebugExpr,
340 input: Box<HydroNode>,
341 op_metadata: HydroIrOpMetadata,
342 },
343 CycleSink {
344 ident: syn::Ident,
345 input: Box<HydroNode>,
346 out_location: LocationId,
347 op_metadata: HydroIrOpMetadata,
348 },
349}
350
351impl HydroRoot {
352 #[cfg(feature = "build")]
353 pub fn compile_network<'a, D>(
354 &mut self,
355 compile_env: &D::CompileEnv,
356 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
357 seen_tees: &mut SeenTees,
358 processes: &HashMap<usize, D::Process>,
359 clusters: &HashMap<usize, D::Cluster>,
360 externals: &HashMap<usize, D::External>,
361 ) where
362 D: Deploy<'a>,
363 {
364 self.transform_bottom_up(
365 &mut |l| {
366 if let HydroRoot::SendExternal {
367 input,
368 to_external_id,
369 to_key,
370 to_many,
371 instantiate_fn,
372 ..
373 } = l
374 {
375 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
376 DebugInstantiate::Building => {
377 let to_node = externals
378 .get(to_external_id)
379 .unwrap_or_else(|| {
380 panic!("A external used in the graph was not instantiated: {}", to_external_id)
381 })
382 .clone();
383
384 match input.metadata().location_kind.root() {
385 LocationId::Process(process_id) => {
386 if *to_many {
387 (
388 (
389 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
390 parse_quote!(DUMMY),
391 ),
392 Box::new(|| {}) as Box<dyn FnOnce()>,
393 )
394 } else {
395 let from_node = processes
396 .get(process_id)
397 .unwrap_or_else(|| {
398 panic!("A process used in the graph was not instantiated: {}", process_id)
399 })
400 .clone();
401
402 let sink_port = D::allocate_process_port(&from_node);
403 let source_port = D::allocate_external_port(&to_node);
404
405 to_node.register(*to_key, source_port.clone());
406
407 (
408 (
409 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
410 parse_quote!(DUMMY),
411 ),
412 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
413 )
414 }
415 }
416 LocationId::Cluster(_) => todo!(),
417 _ => panic!()
418 }
419 },
420
421 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
422 };
423
424 *instantiate_fn = DebugInstantiateFinalized {
425 sink: sink_expr,
426 source: source_expr,
427 connect_fn: Some(connect_fn),
428 }
429 .into();
430 }
431 },
432 &mut |n| {
433 if let HydroNode::Network {
434 input,
435 instantiate_fn,
436 metadata,
437 ..
438 } = n
439 {
440 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
441 DebugInstantiate::Building => instantiate_network::<D>(
442 input.metadata().location_kind.root(),
443 metadata.location_kind.root(),
444 processes,
445 clusters,
446 compile_env,
447 ),
448
449 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
450 };
451
452 *instantiate_fn = DebugInstantiateFinalized {
453 sink: sink_expr,
454 source: source_expr,
455 connect_fn: Some(connect_fn),
456 }
457 .into();
458 } else if let HydroNode::ExternalInput {
459 from_external_id,
460 from_key,
461 from_many,
462 codec_type,
463 port_hint,
464 instantiate_fn,
465 metadata,
466 ..
467 } = n
468 {
469 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
470 DebugInstantiate::Building => {
471 let from_node = externals
472 .get(from_external_id)
473 .unwrap_or_else(|| {
474 panic!(
475 "A external used in the graph was not instantiated: {}",
476 from_external_id
477 )
478 })
479 .clone();
480
481 match metadata.location_kind.root() {
482 LocationId::Process(process_id) => {
483 let to_node = processes
484 .get(process_id)
485 .unwrap_or_else(|| {
486 panic!("A process used in the graph was not instantiated: {}", process_id)
487 })
488 .clone();
489
490 let sink_port = D::allocate_external_port(&from_node);
491 let source_port = D::allocate_process_port(&to_node);
492
493 from_node.register(*from_key, sink_port.clone());
494
495 (
496 (
497 parse_quote!(DUMMY),
498 if *from_many {
499 D::e2o_many_source(
500 compile_env,
501 extra_stmts.entry(*process_id).or_default(),
502 &to_node, &source_port,
503 codec_type.0.as_ref(),
504 format!("{}_{}", *from_external_id, *from_key)
505 )
506 } else {
507 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
508 },
509 ),
510 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
511 )
512 }
513 LocationId::Cluster(_) => todo!(),
514 _ => panic!()
515 }
516 },
517
518 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
519 };
520
521 *instantiate_fn = DebugInstantiateFinalized {
522 sink: sink_expr,
523 source: source_expr,
524 connect_fn: Some(connect_fn),
525 }
526 .into();
527 }
528 },
529 seen_tees,
530 false,
531 );
532 }
533
534 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
535 self.transform_bottom_up(
536 &mut |l| {
537 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
538 match instantiate_fn {
539 DebugInstantiate::Building => panic!("network not built"),
540
541 DebugInstantiate::Finalized(finalized) => {
542 (finalized.connect_fn.take().unwrap())();
543 }
544 }
545 }
546 },
547 &mut |n| {
548 if let HydroNode::Network { instantiate_fn, .. }
549 | HydroNode::ExternalInput { instantiate_fn, .. } = n
550 {
551 match instantiate_fn {
552 DebugInstantiate::Building => panic!("network not built"),
553
554 DebugInstantiate::Finalized(finalized) => {
555 (finalized.connect_fn.take().unwrap())();
556 }
557 }
558 }
559 },
560 seen_tees,
561 false,
562 );
563 }
564
565 pub fn transform_bottom_up(
566 &mut self,
567 transform_root: &mut impl FnMut(&mut HydroRoot),
568 transform_node: &mut impl FnMut(&mut HydroNode),
569 seen_tees: &mut SeenTees,
570 check_well_formed: bool,
571 ) {
572 self.transform_children(
573 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
574 seen_tees,
575 );
576
577 transform_root(self);
578 }
579
580 pub fn transform_children(
581 &mut self,
582 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
583 seen_tees: &mut SeenTees,
584 ) {
585 match self {
586 HydroRoot::ForEach { input, .. }
587 | HydroRoot::SendExternal { input, .. }
588 | HydroRoot::DestSink { input, .. }
589 | HydroRoot::CycleSink { input, .. } => {
590 transform(input, seen_tees);
591 }
592 }
593 }
594
595 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
596 match self {
597 HydroRoot::ForEach {
598 f,
599 input,
600 op_metadata,
601 } => HydroRoot::ForEach {
602 f: f.clone(),
603 input: Box::new(input.deep_clone(seen_tees)),
604 op_metadata: op_metadata.clone(),
605 },
606 HydroRoot::SendExternal {
607 to_external_id,
608 to_key,
609 to_many,
610 serialize_fn,
611 instantiate_fn,
612 input,
613 op_metadata,
614 } => HydroRoot::SendExternal {
615 to_external_id: *to_external_id,
616 to_key: *to_key,
617 to_many: *to_many,
618 serialize_fn: serialize_fn.clone(),
619 instantiate_fn: instantiate_fn.clone(),
620 input: Box::new(input.deep_clone(seen_tees)),
621 op_metadata: op_metadata.clone(),
622 },
623 HydroRoot::DestSink {
624 sink,
625 input,
626 op_metadata,
627 } => HydroRoot::DestSink {
628 sink: sink.clone(),
629 input: Box::new(input.deep_clone(seen_tees)),
630 op_metadata: op_metadata.clone(),
631 },
632 HydroRoot::CycleSink {
633 ident,
634 input,
635 out_location,
636 op_metadata,
637 } => HydroRoot::CycleSink {
638 ident: ident.clone(),
639 input: Box::new(input.deep_clone(seen_tees)),
640 out_location: out_location.clone(),
641 op_metadata: op_metadata.clone(),
642 },
643 }
644 }
645
646 #[cfg(feature = "build")]
647 pub fn emit(
648 &mut self,
649 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
650 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
651 next_stmt_id: &mut usize,
652 ) {
653 self.emit_core(
654 &mut BuildersOrCallback::Builders::<
655 fn(&mut HydroRoot, &mut usize),
656 fn(&mut HydroNode, &mut usize),
657 >(graph_builders),
658 built_tees,
659 next_stmt_id,
660 );
661 }
662
663 #[cfg(feature = "build")]
664 pub fn emit_core(
665 &mut self,
666 builders_or_callback: &mut BuildersOrCallback<
667 impl FnMut(&mut HydroRoot, &mut usize),
668 impl FnMut(&mut HydroNode, &mut usize),
669 >,
670 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
671 next_stmt_id: &mut usize,
672 ) {
673 match self {
674 HydroRoot::ForEach { f, input, .. } => {
675 let (input_ident, input_location_id) =
676 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
677
678 match builders_or_callback {
679 BuildersOrCallback::Builders(graph_builders) => {
680 graph_builders
681 .entry(input_location_id)
682 .or_default()
683 .add_dfir(
684 parse_quote! {
685 #input_ident -> for_each(#f);
686 },
687 None,
688 Some(&next_stmt_id.to_string()),
689 );
690 }
691 BuildersOrCallback::Callback(leaf_callback, _) => {
692 leaf_callback(self, next_stmt_id);
693 }
694 }
695
696 *next_stmt_id += 1;
697 }
698
699 HydroRoot::SendExternal {
700 serialize_fn,
701 instantiate_fn,
702 input,
703 ..
704 } => {
705 let (input_ident, input_location_id) =
706 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
707
708 match builders_or_callback {
709 BuildersOrCallback::Builders(graph_builders) => {
710 let (sink_expr, _) = match instantiate_fn {
711 DebugInstantiate::Building => (
712 syn::parse_quote!(DUMMY_SINK),
713 syn::parse_quote!(DUMMY_SOURCE),
714 ),
715
716 DebugInstantiate::Finalized(finalized) => {
717 (finalized.sink.clone(), finalized.source.clone())
718 }
719 };
720
721 let sender_builder = graph_builders.entry(input_location_id).or_default();
722 if let Some(serialize_fn) = serialize_fn {
723 sender_builder.add_dfir(
724 parse_quote! {
725 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
726 },
727 None,
728 Some(&format!("send{}", next_stmt_id)),
730 );
731 } else {
732 sender_builder.add_dfir(
733 parse_quote! {
734 #input_ident -> dest_sink(#sink_expr);
735 },
736 None,
737 Some(&format!("send{}", next_stmt_id)),
738 );
739 }
740 }
741 BuildersOrCallback::Callback(leaf_callback, _) => {
742 leaf_callback(self, next_stmt_id);
743 }
744 }
745
746 *next_stmt_id += 1;
747 }
748
749 HydroRoot::DestSink { sink, input, .. } => {
750 let (input_ident, input_location_id) =
751 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
752
753 match builders_or_callback {
754 BuildersOrCallback::Builders(graph_builders) => {
755 graph_builders
756 .entry(input_location_id)
757 .or_default()
758 .add_dfir(
759 parse_quote! {
760 #input_ident -> dest_sink(#sink);
761 },
762 None,
763 Some(&next_stmt_id.to_string()),
764 );
765 }
766 BuildersOrCallback::Callback(leaf_callback, _) => {
767 leaf_callback(self, next_stmt_id);
768 }
769 }
770
771 *next_stmt_id += 1;
772 }
773
774 HydroRoot::CycleSink {
775 ident,
776 input,
777 out_location,
778 ..
779 } => {
780 let (input_ident, input_location_id) =
781 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
782
783 let location_id = out_location.root().raw_id();
784
785 match builders_or_callback {
786 BuildersOrCallback::Builders(graph_builders) => {
787 assert_eq!(
788 input_location_id, location_id,
789 "cycle_sink location mismatch"
790 );
791
792 graph_builders.entry(location_id).or_default().add_dfir(
793 parse_quote! {
794 #ident = #input_ident;
795 },
796 None,
797 None,
798 );
799 }
800 BuildersOrCallback::Callback(_, _) => {}
802 }
803 }
804 }
805 }
806
807 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
808 match self {
809 HydroRoot::ForEach { op_metadata, .. }
810 | HydroRoot::SendExternal { op_metadata, .. }
811 | HydroRoot::DestSink { op_metadata, .. }
812 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
813 }
814 }
815
816 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
817 match self {
818 HydroRoot::ForEach { op_metadata, .. }
819 | HydroRoot::SendExternal { op_metadata, .. }
820 | HydroRoot::DestSink { op_metadata, .. }
821 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
822 }
823 }
824
825 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
826 match self {
827 HydroRoot::ForEach { input, .. }
828 | HydroRoot::SendExternal { input, .. }
829 | HydroRoot::DestSink { input, .. }
830 | HydroRoot::CycleSink { input, .. } => {
831 vec![input.metadata()]
832 }
833 }
834 }
835
836 pub fn print_root(&self) -> String {
837 match self {
838 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
839 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
840 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
841 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
842 }
843 }
844
845 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
846 match self {
847 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
848 transform(f);
849 }
850 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
851 }
852 }
853}
854
855#[cfg(feature = "build")]
856pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
857 let mut builders = BTreeMap::new();
858 let mut built_tees = HashMap::new();
859 let mut next_stmt_id = 0;
860 for leaf in ir {
861 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
862 }
863 builders
864}
865
866#[cfg(feature = "build")]
867pub fn traverse_dfir(
868 ir: &mut [HydroRoot],
869 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
870 transform_node: impl FnMut(&mut HydroNode, &mut usize),
871) {
872 let mut seen_tees = HashMap::new();
873 let mut next_stmt_id = 0;
874 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
875 ir.iter_mut().for_each(|leaf| {
876 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
877 });
878}
879
880pub fn transform_bottom_up(
881 ir: &mut [HydroRoot],
882 transform_root: &mut impl FnMut(&mut HydroRoot),
883 transform_node: &mut impl FnMut(&mut HydroNode),
884 check_well_formed: bool,
885) {
886 let mut seen_tees = HashMap::new();
887 ir.iter_mut().for_each(|leaf| {
888 leaf.transform_bottom_up(
889 transform_root,
890 transform_node,
891 &mut seen_tees,
892 check_well_formed,
893 );
894 });
895}
896
897pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
898 let mut seen_tees = HashMap::new();
899 ir.iter()
900 .map(|leaf| leaf.deep_clone(&mut seen_tees))
901 .collect()
902}
903
904type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
905thread_local! {
906 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
907}
908
909pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
910 PRINTED_TEES.with(|printed_tees| {
911 let mut printed_tees_mut = printed_tees.borrow_mut();
912 *printed_tees_mut = Some((0, HashMap::new()));
913 drop(printed_tees_mut);
914
915 let ret = f();
916
917 let mut printed_tees_mut = printed_tees.borrow_mut();
918 *printed_tees_mut = None;
919
920 ret
921 })
922}
923
924pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
925
926impl TeeNode {
927 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
928 Rc::as_ptr(&self.0)
929 }
930}
931
932impl Debug for TeeNode {
933 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934 PRINTED_TEES.with(|printed_tees| {
935 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
936 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
937
938 if let Some(printed_tees_mut) = printed_tees_mut {
939 if let Some(existing) = printed_tees_mut
940 .1
941 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
942 {
943 write!(f, "<tee {}>", existing)
944 } else {
945 let next_id = printed_tees_mut.0;
946 printed_tees_mut.0 += 1;
947 printed_tees_mut
948 .1
949 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
950 drop(printed_tees_mut_borrow);
951 write!(f, "<tee {}>: ", next_id)?;
952 Debug::fmt(&self.0.borrow(), f)
953 }
954 } else {
955 drop(printed_tees_mut_borrow);
956 write!(f, "<tee>: ")?;
957 Debug::fmt(&self.0.borrow(), f)
958 }
959 })
960 }
961}
962
963impl Hash for TeeNode {
964 fn hash<H: Hasher>(&self, state: &mut H) {
965 self.0.borrow_mut().hash(state);
966 }
967}
968
969#[derive(Clone)]
970pub struct HydroIrMetadata {
971 pub location_kind: LocationId,
972 pub output_type: Option<DebugType>,
973 pub cardinality: Option<usize>,
974 pub tag: Option<String>,
975 pub op: HydroIrOpMetadata,
976}
977
978impl Hash for HydroIrMetadata {
980 fn hash<H: Hasher>(&self, _: &mut H) {}
981}
982
983impl PartialEq for HydroIrMetadata {
984 fn eq(&self, _: &Self) -> bool {
985 true
986 }
987}
988
989impl Eq for HydroIrMetadata {}
990
991impl Debug for HydroIrMetadata {
992 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993 f.debug_struct("HydroIrMetadata")
994 .field("location_kind", &self.location_kind)
995 .field("output_type", &self.output_type)
996 .finish()
997 }
998}
999
1000#[derive(Clone)]
1003pub struct HydroIrOpMetadata {
1004 pub backtrace: Backtrace,
1005 pub cpu_usage: Option<f64>,
1006 pub network_recv_cpu_usage: Option<f64>,
1007 pub id: Option<usize>,
1008}
1009
1010impl HydroIrOpMetadata {
1011 #[expect(
1012 clippy::new_without_default,
1013 reason = "explicit calls to new ensure correct backtrace bounds"
1014 )]
1015 pub fn new() -> HydroIrOpMetadata {
1016 Self::new_with_skip(1)
1017 }
1018
1019 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1020 HydroIrOpMetadata {
1021 backtrace: Backtrace::get_backtrace(2 + skip_count),
1022 cpu_usage: None,
1023 network_recv_cpu_usage: None,
1024 id: None,
1025 }
1026 }
1027}
1028
1029impl Debug for HydroIrOpMetadata {
1030 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031 f.debug_struct("HydroIrOpMetadata").finish()
1032 }
1033}
1034
1035impl Hash for HydroIrOpMetadata {
1036 fn hash<H: Hasher>(&self, _: &mut H) {}
1037}
1038
1039#[derive(Debug, Hash)]
1042pub enum HydroNode {
1043 Placeholder,
1044
1045 Source {
1046 source: HydroSource,
1047 metadata: HydroIrMetadata,
1048 },
1049
1050 CycleSource {
1051 ident: syn::Ident,
1052 metadata: HydroIrMetadata,
1053 },
1054
1055 Tee {
1056 inner: TeeNode,
1057 metadata: HydroIrMetadata,
1058 },
1059
1060 Persist {
1061 inner: Box<HydroNode>,
1062 metadata: HydroIrMetadata,
1063 },
1064
1065 Unpersist {
1066 inner: Box<HydroNode>,
1067 metadata: HydroIrMetadata,
1068 },
1069
1070 Delta {
1071 inner: Box<HydroNode>,
1072 metadata: HydroIrMetadata,
1073 },
1074
1075 Chain {
1076 first: Box<HydroNode>,
1077 second: Box<HydroNode>,
1078 metadata: HydroIrMetadata,
1079 },
1080
1081 ChainFirst {
1082 first: Box<HydroNode>,
1083 second: Box<HydroNode>,
1084 metadata: HydroIrMetadata,
1085 },
1086
1087 CrossProduct {
1088 left: Box<HydroNode>,
1089 right: Box<HydroNode>,
1090 metadata: HydroIrMetadata,
1091 },
1092
1093 CrossSingleton {
1094 left: Box<HydroNode>,
1095 right: Box<HydroNode>,
1096 metadata: HydroIrMetadata,
1097 },
1098
1099 Join {
1100 left: Box<HydroNode>,
1101 right: Box<HydroNode>,
1102 metadata: HydroIrMetadata,
1103 },
1104
1105 Difference {
1106 pos: Box<HydroNode>,
1107 neg: Box<HydroNode>,
1108 metadata: HydroIrMetadata,
1109 },
1110
1111 AntiJoin {
1112 pos: Box<HydroNode>,
1113 neg: Box<HydroNode>,
1114 metadata: HydroIrMetadata,
1115 },
1116
1117 ResolveFutures {
1118 input: Box<HydroNode>,
1119 metadata: HydroIrMetadata,
1120 },
1121 ResolveFuturesOrdered {
1122 input: Box<HydroNode>,
1123 metadata: HydroIrMetadata,
1124 },
1125
1126 Map {
1127 f: DebugExpr,
1128 input: Box<HydroNode>,
1129 metadata: HydroIrMetadata,
1130 },
1131 FlatMap {
1132 f: DebugExpr,
1133 input: Box<HydroNode>,
1134 metadata: HydroIrMetadata,
1135 },
1136 Filter {
1137 f: DebugExpr,
1138 input: Box<HydroNode>,
1139 metadata: HydroIrMetadata,
1140 },
1141 FilterMap {
1142 f: DebugExpr,
1143 input: Box<HydroNode>,
1144 metadata: HydroIrMetadata,
1145 },
1146
1147 DeferTick {
1148 input: Box<HydroNode>,
1149 metadata: HydroIrMetadata,
1150 },
1151 Enumerate {
1152 is_static: bool,
1153 input: Box<HydroNode>,
1154 metadata: HydroIrMetadata,
1155 },
1156 Inspect {
1157 f: DebugExpr,
1158 input: Box<HydroNode>,
1159 metadata: HydroIrMetadata,
1160 },
1161
1162 Unique {
1163 input: Box<HydroNode>,
1164 metadata: HydroIrMetadata,
1165 },
1166
1167 Sort {
1168 input: Box<HydroNode>,
1169 metadata: HydroIrMetadata,
1170 },
1171 Fold {
1172 init: DebugExpr,
1173 acc: DebugExpr,
1174 input: Box<HydroNode>,
1175 metadata: HydroIrMetadata,
1176 },
1177
1178 Scan {
1179 init: DebugExpr,
1180 acc: DebugExpr,
1181 input: Box<HydroNode>,
1182 metadata: HydroIrMetadata,
1183 },
1184 FoldKeyed {
1185 init: DebugExpr,
1186 acc: DebugExpr,
1187 input: Box<HydroNode>,
1188 metadata: HydroIrMetadata,
1189 },
1190
1191 Reduce {
1192 f: DebugExpr,
1193 input: Box<HydroNode>,
1194 metadata: HydroIrMetadata,
1195 },
1196 ReduceKeyed {
1197 f: DebugExpr,
1198 input: Box<HydroNode>,
1199 metadata: HydroIrMetadata,
1200 },
1201 ReduceKeyedWatermark {
1202 f: DebugExpr,
1203 input: Box<HydroNode>,
1204 watermark: Box<HydroNode>,
1205 metadata: HydroIrMetadata,
1206 },
1207
1208 Network {
1209 serialize_fn: Option<DebugExpr>,
1210 instantiate_fn: DebugInstantiate,
1211 deserialize_fn: Option<DebugExpr>,
1212 input: Box<HydroNode>,
1213 metadata: HydroIrMetadata,
1214 },
1215
1216 ExternalInput {
1217 from_external_id: usize,
1218 from_key: usize,
1219 from_many: bool,
1220 codec_type: DebugType,
1221 port_hint: NetworkHint,
1222 instantiate_fn: DebugInstantiate,
1223 deserialize_fn: Option<DebugExpr>,
1224 metadata: HydroIrMetadata,
1225 },
1226
1227 Counter {
1228 tag: String,
1229 duration: DebugExpr,
1230 prefix: String,
1231 input: Box<HydroNode>,
1232 metadata: HydroIrMetadata,
1233 },
1234}
1235
1236pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1237pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1238
1239impl HydroNode {
1240 pub fn transform_bottom_up(
1241 &mut self,
1242 transform: &mut impl FnMut(&mut HydroNode),
1243 seen_tees: &mut SeenTees,
1244 check_well_formed: bool,
1245 ) {
1246 self.transform_children(
1247 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1248 seen_tees,
1249 );
1250
1251 transform(self);
1252
1253 let self_location = self.metadata().location_kind.root();
1254
1255 if check_well_formed {
1256 match &*self {
1257 HydroNode::Network { .. } => {}
1258 _ => {
1259 self.input_metadata().iter().for_each(|i| {
1260 if i.location_kind.root() != self_location {
1261 panic!(
1262 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1263 i,
1264 i.location_kind.root(),
1265 self,
1266 self_location
1267 )
1268 }
1269 });
1270 }
1271 }
1272 }
1273 }
1274
1275 #[inline(always)]
1276 pub fn transform_children(
1277 &mut self,
1278 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1279 seen_tees: &mut SeenTees,
1280 ) {
1281 match self {
1282 HydroNode::Placeholder => {
1283 panic!();
1284 }
1285
1286 HydroNode::Source { .. }
1287 | HydroNode::CycleSource { .. }
1288 | HydroNode::ExternalInput { .. } => {}
1289
1290 HydroNode::Tee { inner, .. } => {
1291 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1292 *inner = TeeNode(transformed.clone());
1293 } else {
1294 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1295 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1296 let mut orig = inner.0.replace(HydroNode::Placeholder);
1297 transform(&mut orig, seen_tees);
1298 *transformed_cell.borrow_mut() = orig;
1299 *inner = TeeNode(transformed_cell);
1300 }
1301 }
1302
1303 HydroNode::Persist { inner, .. }
1304 | HydroNode::Unpersist { inner, .. }
1305 | HydroNode::Delta { inner, .. } => {
1306 transform(inner.as_mut(), seen_tees);
1307 }
1308
1309 HydroNode::Chain { first, second, .. } => {
1310 transform(first.as_mut(), seen_tees);
1311 transform(second.as_mut(), seen_tees);
1312 }
1313
1314 HydroNode::ChainFirst { first, second, .. } => {
1315 transform(first.as_mut(), seen_tees);
1316 transform(second.as_mut(), seen_tees);
1317 }
1318
1319 HydroNode::CrossSingleton { left, right, .. }
1320 | HydroNode::CrossProduct { left, right, .. }
1321 | HydroNode::Join { left, right, .. } => {
1322 transform(left.as_mut(), seen_tees);
1323 transform(right.as_mut(), seen_tees);
1324 }
1325
1326 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1327 transform(pos.as_mut(), seen_tees);
1328 transform(neg.as_mut(), seen_tees);
1329 }
1330
1331 HydroNode::ReduceKeyedWatermark {
1332 input, watermark, ..
1333 } => {
1334 transform(input.as_mut(), seen_tees);
1335 transform(watermark.as_mut(), seen_tees);
1336 }
1337
1338 HydroNode::Map { input, .. }
1339 | HydroNode::ResolveFutures { input, .. }
1340 | HydroNode::ResolveFuturesOrdered { input, .. }
1341 | HydroNode::FlatMap { input, .. }
1342 | HydroNode::Filter { input, .. }
1343 | HydroNode::FilterMap { input, .. }
1344 | HydroNode::Sort { input, .. }
1345 | HydroNode::DeferTick { input, .. }
1346 | HydroNode::Enumerate { input, .. }
1347 | HydroNode::Inspect { input, .. }
1348 | HydroNode::Unique { input, .. }
1349 | HydroNode::Network { input, .. }
1350 | HydroNode::Fold { input, .. }
1351 | HydroNode::Scan { input, .. }
1352 | HydroNode::FoldKeyed { input, .. }
1353 | HydroNode::Reduce { input, .. }
1354 | HydroNode::ReduceKeyed { input, .. }
1355 | HydroNode::Counter { input, .. } => {
1356 transform(input.as_mut(), seen_tees);
1357 }
1358 }
1359 }
1360
1361 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1362 match self {
1363 HydroNode::Placeholder => HydroNode::Placeholder,
1364 HydroNode::Source { source, metadata } => HydroNode::Source {
1365 source: source.clone(),
1366 metadata: metadata.clone(),
1367 },
1368 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1369 ident: ident.clone(),
1370 metadata: metadata.clone(),
1371 },
1372 HydroNode::Tee { inner, metadata } => {
1373 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1374 HydroNode::Tee {
1375 inner: TeeNode(transformed.clone()),
1376 metadata: metadata.clone(),
1377 }
1378 } else {
1379 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1380 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1381 let cloned = inner.0.borrow().deep_clone(seen_tees);
1382 *new_rc.borrow_mut() = cloned;
1383 HydroNode::Tee {
1384 inner: TeeNode(new_rc),
1385 metadata: metadata.clone(),
1386 }
1387 }
1388 }
1389 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1390 inner: Box::new(inner.deep_clone(seen_tees)),
1391 metadata: metadata.clone(),
1392 },
1393 HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
1394 inner: Box::new(inner.deep_clone(seen_tees)),
1395 metadata: metadata.clone(),
1396 },
1397 HydroNode::Delta { inner, metadata } => HydroNode::Delta {
1398 inner: Box::new(inner.deep_clone(seen_tees)),
1399 metadata: metadata.clone(),
1400 },
1401 HydroNode::Chain {
1402 first,
1403 second,
1404 metadata,
1405 } => HydroNode::Chain {
1406 first: Box::new(first.deep_clone(seen_tees)),
1407 second: Box::new(second.deep_clone(seen_tees)),
1408 metadata: metadata.clone(),
1409 },
1410 HydroNode::ChainFirst {
1411 first,
1412 second,
1413 metadata,
1414 } => HydroNode::ChainFirst {
1415 first: Box::new(first.deep_clone(seen_tees)),
1416 second: Box::new(second.deep_clone(seen_tees)),
1417 metadata: metadata.clone(),
1418 },
1419 HydroNode::CrossProduct {
1420 left,
1421 right,
1422 metadata,
1423 } => HydroNode::CrossProduct {
1424 left: Box::new(left.deep_clone(seen_tees)),
1425 right: Box::new(right.deep_clone(seen_tees)),
1426 metadata: metadata.clone(),
1427 },
1428 HydroNode::CrossSingleton {
1429 left,
1430 right,
1431 metadata,
1432 } => HydroNode::CrossSingleton {
1433 left: Box::new(left.deep_clone(seen_tees)),
1434 right: Box::new(right.deep_clone(seen_tees)),
1435 metadata: metadata.clone(),
1436 },
1437 HydroNode::Join {
1438 left,
1439 right,
1440 metadata,
1441 } => HydroNode::Join {
1442 left: Box::new(left.deep_clone(seen_tees)),
1443 right: Box::new(right.deep_clone(seen_tees)),
1444 metadata: metadata.clone(),
1445 },
1446 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1447 pos: Box::new(pos.deep_clone(seen_tees)),
1448 neg: Box::new(neg.deep_clone(seen_tees)),
1449 metadata: metadata.clone(),
1450 },
1451 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1452 pos: Box::new(pos.deep_clone(seen_tees)),
1453 neg: Box::new(neg.deep_clone(seen_tees)),
1454 metadata: metadata.clone(),
1455 },
1456 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1457 input: Box::new(input.deep_clone(seen_tees)),
1458 metadata: metadata.clone(),
1459 },
1460 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1461 HydroNode::ResolveFuturesOrdered {
1462 input: Box::new(input.deep_clone(seen_tees)),
1463 metadata: metadata.clone(),
1464 }
1465 }
1466 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1467 f: f.clone(),
1468 input: Box::new(input.deep_clone(seen_tees)),
1469 metadata: metadata.clone(),
1470 },
1471 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1472 f: f.clone(),
1473 input: Box::new(input.deep_clone(seen_tees)),
1474 metadata: metadata.clone(),
1475 },
1476 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1477 f: f.clone(),
1478 input: Box::new(input.deep_clone(seen_tees)),
1479 metadata: metadata.clone(),
1480 },
1481 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1482 f: f.clone(),
1483 input: Box::new(input.deep_clone(seen_tees)),
1484 metadata: metadata.clone(),
1485 },
1486 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1487 input: Box::new(input.deep_clone(seen_tees)),
1488 metadata: metadata.clone(),
1489 },
1490 HydroNode::Enumerate {
1491 is_static,
1492 input,
1493 metadata,
1494 } => HydroNode::Enumerate {
1495 is_static: *is_static,
1496 input: Box::new(input.deep_clone(seen_tees)),
1497 metadata: metadata.clone(),
1498 },
1499 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1500 f: f.clone(),
1501 input: Box::new(input.deep_clone(seen_tees)),
1502 metadata: metadata.clone(),
1503 },
1504 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1505 input: Box::new(input.deep_clone(seen_tees)),
1506 metadata: metadata.clone(),
1507 },
1508 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1509 input: Box::new(input.deep_clone(seen_tees)),
1510 metadata: metadata.clone(),
1511 },
1512 HydroNode::Fold {
1513 init,
1514 acc,
1515 input,
1516 metadata,
1517 } => HydroNode::Fold {
1518 init: init.clone(),
1519 acc: acc.clone(),
1520 input: Box::new(input.deep_clone(seen_tees)),
1521 metadata: metadata.clone(),
1522 },
1523 HydroNode::Scan {
1524 init,
1525 acc,
1526 input,
1527 metadata,
1528 } => HydroNode::Scan {
1529 init: init.clone(),
1530 acc: acc.clone(),
1531 input: Box::new(input.deep_clone(seen_tees)),
1532 metadata: metadata.clone(),
1533 },
1534 HydroNode::FoldKeyed {
1535 init,
1536 acc,
1537 input,
1538 metadata,
1539 } => HydroNode::FoldKeyed {
1540 init: init.clone(),
1541 acc: acc.clone(),
1542 input: Box::new(input.deep_clone(seen_tees)),
1543 metadata: metadata.clone(),
1544 },
1545 HydroNode::ReduceKeyedWatermark {
1546 f,
1547 input,
1548 watermark,
1549 metadata,
1550 } => HydroNode::ReduceKeyedWatermark {
1551 f: f.clone(),
1552 input: Box::new(input.deep_clone(seen_tees)),
1553 watermark: Box::new(watermark.deep_clone(seen_tees)),
1554 metadata: metadata.clone(),
1555 },
1556 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1557 f: f.clone(),
1558 input: Box::new(input.deep_clone(seen_tees)),
1559 metadata: metadata.clone(),
1560 },
1561 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1562 f: f.clone(),
1563 input: Box::new(input.deep_clone(seen_tees)),
1564 metadata: metadata.clone(),
1565 },
1566 HydroNode::Network {
1567 serialize_fn,
1568 instantiate_fn,
1569 deserialize_fn,
1570 input,
1571 metadata,
1572 } => HydroNode::Network {
1573 serialize_fn: serialize_fn.clone(),
1574 instantiate_fn: instantiate_fn.clone(),
1575 deserialize_fn: deserialize_fn.clone(),
1576 input: Box::new(input.deep_clone(seen_tees)),
1577 metadata: metadata.clone(),
1578 },
1579 HydroNode::ExternalInput {
1580 from_external_id,
1581 from_key,
1582 from_many,
1583 codec_type,
1584 port_hint,
1585 instantiate_fn,
1586 deserialize_fn,
1587 metadata,
1588 } => HydroNode::ExternalInput {
1589 from_external_id: *from_external_id,
1590 from_key: *from_key,
1591 from_many: *from_many,
1592 codec_type: codec_type.clone(),
1593 port_hint: *port_hint,
1594 instantiate_fn: instantiate_fn.clone(),
1595 deserialize_fn: deserialize_fn.clone(),
1596 metadata: metadata.clone(),
1597 },
1598 HydroNode::Counter {
1599 tag,
1600 duration,
1601 prefix,
1602 input,
1603 metadata,
1604 } => HydroNode::Counter {
1605 tag: tag.clone(),
1606 duration: duration.clone(),
1607 prefix: prefix.clone(),
1608 input: Box::new(input.deep_clone(seen_tees)),
1609 metadata: metadata.clone(),
1610 },
1611 }
1612 }
1613
1614 #[cfg(feature = "build")]
1615 pub fn emit_core(
1616 &mut self,
1617 builders_or_callback: &mut BuildersOrCallback<
1618 impl FnMut(&mut HydroRoot, &mut usize),
1619 impl FnMut(&mut HydroNode, &mut usize),
1620 >,
1621 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1622 next_stmt_id: &mut usize,
1623 ) -> (syn::Ident, usize) {
1624 match self {
1625 HydroNode::Placeholder => {
1626 panic!()
1627 }
1628
1629 HydroNode::Persist { inner, .. } => {
1630 let (inner_ident, location) =
1631 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1632
1633 let persist_ident =
1634 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1635
1636 match builders_or_callback {
1637 BuildersOrCallback::Builders(graph_builders) => {
1638 let builder = graph_builders.entry(location).or_default();
1639 builder.add_dfir(
1640 parse_quote! {
1641 #persist_ident = #inner_ident -> persist::<'static>();
1642 },
1643 None,
1644 Some(&next_stmt_id.to_string()),
1645 );
1646 }
1647 BuildersOrCallback::Callback(_, node_callback) => {
1648 node_callback(self, next_stmt_id);
1649 }
1650 }
1651
1652 *next_stmt_id += 1;
1653
1654 (persist_ident, location)
1655 }
1656
1657 HydroNode::Unpersist { .. } => {
1658 panic!(
1659 "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1660 )
1661 }
1662
1663 HydroNode::Delta { inner, .. } => {
1664 let (inner_ident, location) =
1665 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1666
1667 let delta_ident =
1668 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1669
1670 match builders_or_callback {
1671 BuildersOrCallback::Builders(graph_builders) => {
1672 let builder = graph_builders.entry(location).or_default();
1673 builder.add_dfir(
1674 parse_quote! {
1675 #delta_ident = #inner_ident -> multiset_delta();
1676 },
1677 None,
1678 Some(&next_stmt_id.to_string()),
1679 );
1680 }
1681 BuildersOrCallback::Callback(_, node_callback) => {
1682 node_callback(self, next_stmt_id);
1683 }
1684 }
1685
1686 *next_stmt_id += 1;
1687
1688 (delta_ident, location)
1689 }
1690
1691 HydroNode::Source {
1692 source, metadata, ..
1693 } => {
1694 let location_id = metadata.location_kind.root().raw_id();
1695
1696 if let HydroSource::ExternalNetwork() = source {
1697 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1698 } else {
1699 let source_ident =
1700 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1701
1702 let source_stmt = match source {
1703 HydroSource::Stream(expr) => {
1704 parse_quote! {
1705 #source_ident = source_stream(#expr);
1706 }
1707 }
1708
1709 HydroSource::ExternalNetwork() => {
1710 unreachable!()
1711 }
1712
1713 HydroSource::Iter(expr) => {
1714 parse_quote! {
1715 #source_ident = source_iter(#expr);
1716 }
1717 }
1718
1719 HydroSource::Spin() => {
1720 parse_quote! {
1721 #source_ident = spin();
1722 }
1723 }
1724 };
1725
1726 match builders_or_callback {
1727 BuildersOrCallback::Builders(graph_builders) => {
1728 let builder = graph_builders.entry(location_id).or_default();
1729 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1730 }
1731 BuildersOrCallback::Callback(_, node_callback) => {
1732 node_callback(self, next_stmt_id);
1733 }
1734 }
1735
1736 *next_stmt_id += 1;
1737
1738 (source_ident, location_id)
1739 }
1740 }
1741
1742 HydroNode::CycleSource {
1743 ident, metadata, ..
1744 } => {
1745 let location_id = metadata.location_kind.root().raw_id();
1746
1747 let ident = ident.clone();
1748
1749 match builders_or_callback {
1750 BuildersOrCallback::Builders(_) => {}
1751 BuildersOrCallback::Callback(_, node_callback) => {
1752 node_callback(self, next_stmt_id);
1753 }
1754 }
1755
1756 *next_stmt_id += 1;
1758
1759 (ident, location_id)
1760 }
1761
1762 HydroNode::Tee { inner, .. } => {
1763 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1764 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1765 {
1766 match builders_or_callback {
1767 BuildersOrCallback::Builders(_) => {}
1768 BuildersOrCallback::Callback(_, node_callback) => {
1769 node_callback(self, next_stmt_id);
1770 }
1771 }
1772
1773 (teed_from.clone(), *inner_location_id)
1774 } else {
1775 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1776 builders_or_callback,
1777 built_tees,
1778 next_stmt_id,
1779 );
1780
1781 let tee_ident =
1782 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1783
1784 built_tees.insert(
1785 inner.0.as_ref() as *const RefCell<HydroNode>,
1786 (tee_ident.clone(), inner_location_id),
1787 );
1788
1789 match builders_or_callback {
1790 BuildersOrCallback::Builders(graph_builders) => {
1791 let builder = graph_builders.entry(inner_location_id).or_default();
1792 builder.add_dfir(
1793 parse_quote! {
1794 #tee_ident = #inner_ident -> tee();
1795 },
1796 None,
1797 Some(&next_stmt_id.to_string()),
1798 );
1799 }
1800 BuildersOrCallback::Callback(_, node_callback) => {
1801 node_callback(self, next_stmt_id);
1802 }
1803 }
1804
1805 (tee_ident, inner_location_id)
1806 };
1807
1808 *next_stmt_id += 1;
1812 (ret_ident, inner_location_id)
1813 }
1814
1815 HydroNode::Chain { first, second, .. } => {
1816 let (first_ident, first_location_id) =
1817 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1818 let (second_ident, second_location_id) =
1819 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1820
1821 let chain_ident =
1822 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1823
1824 match builders_or_callback {
1825 BuildersOrCallback::Builders(graph_builders) => {
1826 assert_eq!(
1827 first_location_id, second_location_id,
1828 "chain inputs must be in the same location"
1829 );
1830 let builder = graph_builders.entry(first_location_id).or_default();
1831 builder.add_dfir(
1832 parse_quote! {
1833 #chain_ident = chain();
1834 #first_ident -> [0]#chain_ident;
1835 #second_ident -> [1]#chain_ident;
1836 },
1837 None,
1838 Some(&next_stmt_id.to_string()),
1839 );
1840 }
1841 BuildersOrCallback::Callback(_, node_callback) => {
1842 node_callback(self, next_stmt_id);
1843 }
1844 }
1845
1846 *next_stmt_id += 1;
1847
1848 (chain_ident, first_location_id)
1849 }
1850
1851 HydroNode::ChainFirst { first, second, .. } => {
1852 let (first_ident, first_location_id) =
1853 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1854 let (second_ident, second_location_id) =
1855 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1856
1857 let chain_ident =
1858 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1859
1860 match builders_or_callback {
1861 BuildersOrCallback::Builders(graph_builders) => {
1862 assert_eq!(
1863 first_location_id, second_location_id,
1864 "chain inputs must be in the same location"
1865 );
1866 let builder = graph_builders.entry(first_location_id).or_default();
1867 builder.add_dfir(
1868 parse_quote! {
1869 #chain_ident = chain_first_n(1);
1870 #first_ident -> [0]#chain_ident;
1871 #second_ident -> [1]#chain_ident;
1872 },
1873 None,
1874 Some(&next_stmt_id.to_string()),
1875 );
1876 }
1877 BuildersOrCallback::Callback(_, node_callback) => {
1878 node_callback(self, next_stmt_id);
1879 }
1880 }
1881
1882 *next_stmt_id += 1;
1883
1884 (chain_ident, first_location_id)
1885 }
1886
1887 HydroNode::CrossSingleton { left, right, .. } => {
1888 let (left_ident, left_location_id) =
1889 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1890 let (right_ident, right_location_id) =
1891 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1892
1893 let cross_ident =
1894 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1895
1896 match builders_or_callback {
1897 BuildersOrCallback::Builders(graph_builders) => {
1898 assert_eq!(
1899 left_location_id, right_location_id,
1900 "cross_singleton inputs must be in the same location"
1901 );
1902
1903 let builder = graph_builders.entry(left_location_id).or_default();
1904 builder.add_dfir(
1905 parse_quote! {
1906 #cross_ident = cross_singleton();
1907 #left_ident -> [input]#cross_ident;
1908 #right_ident -> [single]#cross_ident;
1909 },
1910 None,
1911 Some(&next_stmt_id.to_string()),
1912 );
1913 }
1914 BuildersOrCallback::Callback(_, node_callback) => {
1915 node_callback(self, next_stmt_id);
1916 }
1917 }
1918
1919 *next_stmt_id += 1;
1920
1921 (cross_ident, left_location_id)
1922 }
1923
1924 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1925 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1926 parse_quote!(cross_join_multiset)
1927 } else {
1928 parse_quote!(join_multiset)
1929 };
1930
1931 let (HydroNode::CrossProduct { left, right, .. }
1932 | HydroNode::Join { left, right, .. }) = self
1933 else {
1934 unreachable!()
1935 };
1936
1937 let (left_inner, left_lifetime) =
1938 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1939 (left, quote!('static))
1940 } else {
1941 (left, quote!('tick))
1942 };
1943
1944 let (right_inner, right_lifetime) =
1945 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1946 (right, quote!('static))
1947 } else {
1948 (right, quote!('tick))
1949 };
1950
1951 let (left_ident, left_location_id) =
1952 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1953 let (right_ident, right_location_id) =
1954 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1955
1956 let stream_ident =
1957 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1958
1959 match builders_or_callback {
1960 BuildersOrCallback::Builders(graph_builders) => {
1961 assert_eq!(
1962 left_location_id, right_location_id,
1963 "join / cross product inputs must be in the same location"
1964 );
1965
1966 let builder = graph_builders.entry(left_location_id).or_default();
1967 builder.add_dfir(
1968 parse_quote! {
1969 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1970 #left_ident -> [0]#stream_ident;
1971 #right_ident -> [1]#stream_ident;
1972 },
1973 None,
1974 Some(&next_stmt_id.to_string()),
1975 );
1976 }
1977 BuildersOrCallback::Callback(_, node_callback) => {
1978 node_callback(self, next_stmt_id);
1979 }
1980 }
1981
1982 *next_stmt_id += 1;
1983
1984 (stream_ident, left_location_id)
1985 }
1986
1987 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1988 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1989 parse_quote!(difference_multiset)
1990 } else {
1991 parse_quote!(anti_join_multiset)
1992 };
1993
1994 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1995 self
1996 else {
1997 unreachable!()
1998 };
1999
2000 let (neg, neg_lifetime) =
2001 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2002 (neg, quote!('static))
2003 } else {
2004 (neg, quote!('tick))
2005 };
2006
2007 let (pos_ident, pos_location_id) =
2008 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2009 let (neg_ident, neg_location_id) =
2010 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2011
2012 let stream_ident =
2013 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2014
2015 match builders_or_callback {
2016 BuildersOrCallback::Builders(graph_builders) => {
2017 assert_eq!(
2018 pos_location_id, neg_location_id,
2019 "difference / anti join inputs must be in the same location"
2020 );
2021
2022 let builder = graph_builders.entry(pos_location_id).or_default();
2023 builder.add_dfir(
2024 parse_quote! {
2025 #stream_ident = #operator::<'tick, #neg_lifetime>();
2026 #pos_ident -> [pos]#stream_ident;
2027 #neg_ident -> [neg]#stream_ident;
2028 },
2029 None,
2030 Some(&next_stmt_id.to_string()),
2031 );
2032 }
2033 BuildersOrCallback::Callback(_, node_callback) => {
2034 node_callback(self, next_stmt_id);
2035 }
2036 }
2037
2038 *next_stmt_id += 1;
2039
2040 (stream_ident, pos_location_id)
2041 }
2042
2043 HydroNode::ResolveFutures { input, .. } => {
2044 let (input_ident, input_location_id) =
2045 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2046
2047 let futures_ident =
2048 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2049
2050 match builders_or_callback {
2051 BuildersOrCallback::Builders(graph_builders) => {
2052 let builder = graph_builders.entry(input_location_id).or_default();
2053 builder.add_dfir(
2054 parse_quote! {
2055 #futures_ident = #input_ident -> resolve_futures();
2056 },
2057 None,
2058 Some(&next_stmt_id.to_string()),
2059 );
2060 }
2061 BuildersOrCallback::Callback(_, node_callback) => {
2062 node_callback(self, next_stmt_id);
2063 }
2064 }
2065
2066 *next_stmt_id += 1;
2067
2068 (futures_ident, input_location_id)
2069 }
2070
2071 HydroNode::ResolveFuturesOrdered { input, .. } => {
2072 let (input_ident, input_location_id) =
2073 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2074
2075 let futures_ident =
2076 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2077
2078 match builders_or_callback {
2079 BuildersOrCallback::Builders(graph_builders) => {
2080 let builder = graph_builders.entry(input_location_id).or_default();
2081 builder.add_dfir(
2082 parse_quote! {
2083 #futures_ident = #input_ident -> resolve_futures_ordered();
2084 },
2085 None,
2086 Some(&next_stmt_id.to_string()),
2087 );
2088 }
2089 BuildersOrCallback::Callback(_, node_callback) => {
2090 node_callback(self, next_stmt_id);
2091 }
2092 }
2093
2094 *next_stmt_id += 1;
2095
2096 (futures_ident, input_location_id)
2097 }
2098
2099 HydroNode::Map { f, input, .. } => {
2100 let (input_ident, input_location_id) =
2101 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2102
2103 let map_ident =
2104 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2105
2106 match builders_or_callback {
2107 BuildersOrCallback::Builders(graph_builders) => {
2108 let builder = graph_builders.entry(input_location_id).or_default();
2109 builder.add_dfir(
2110 parse_quote! {
2111 #map_ident = #input_ident -> map(#f);
2112 },
2113 None,
2114 Some(&next_stmt_id.to_string()),
2115 );
2116 }
2117 BuildersOrCallback::Callback(_, node_callback) => {
2118 node_callback(self, next_stmt_id);
2119 }
2120 }
2121
2122 *next_stmt_id += 1;
2123
2124 (map_ident, input_location_id)
2125 }
2126
2127 HydroNode::FlatMap { f, input, .. } => {
2128 let (input_ident, input_location_id) =
2129 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2130
2131 let flat_map_ident =
2132 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2133
2134 match builders_or_callback {
2135 BuildersOrCallback::Builders(graph_builders) => {
2136 let builder = graph_builders.entry(input_location_id).or_default();
2137 builder.add_dfir(
2138 parse_quote! {
2139 #flat_map_ident = #input_ident -> flat_map(#f);
2140 },
2141 None,
2142 Some(&next_stmt_id.to_string()),
2143 );
2144 }
2145 BuildersOrCallback::Callback(_, node_callback) => {
2146 node_callback(self, next_stmt_id);
2147 }
2148 }
2149
2150 *next_stmt_id += 1;
2151
2152 (flat_map_ident, input_location_id)
2153 }
2154
2155 HydroNode::Filter { f, input, .. } => {
2156 let (input_ident, input_location_id) =
2157 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2158
2159 let filter_ident =
2160 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2161
2162 match builders_or_callback {
2163 BuildersOrCallback::Builders(graph_builders) => {
2164 let builder = graph_builders.entry(input_location_id).or_default();
2165 builder.add_dfir(
2166 parse_quote! {
2167 #filter_ident = #input_ident -> filter(#f);
2168 },
2169 None,
2170 Some(&next_stmt_id.to_string()),
2171 );
2172 }
2173 BuildersOrCallback::Callback(_, node_callback) => {
2174 node_callback(self, next_stmt_id);
2175 }
2176 }
2177
2178 *next_stmt_id += 1;
2179
2180 (filter_ident, input_location_id)
2181 }
2182
2183 HydroNode::FilterMap { f, input, .. } => {
2184 let (input_ident, input_location_id) =
2185 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2186
2187 let filter_map_ident =
2188 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2189
2190 match builders_or_callback {
2191 BuildersOrCallback::Builders(graph_builders) => {
2192 let builder = graph_builders.entry(input_location_id).or_default();
2193 builder.add_dfir(
2194 parse_quote! {
2195 #filter_map_ident = #input_ident -> filter_map(#f);
2196 },
2197 None,
2198 Some(&next_stmt_id.to_string()),
2199 );
2200 }
2201 BuildersOrCallback::Callback(_, node_callback) => {
2202 node_callback(self, next_stmt_id);
2203 }
2204 }
2205
2206 *next_stmt_id += 1;
2207
2208 (filter_map_ident, input_location_id)
2209 }
2210
2211 HydroNode::Sort { input, .. } => {
2212 let (input_ident, input_location_id) =
2213 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2214
2215 let sort_ident =
2216 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2217
2218 match builders_or_callback {
2219 BuildersOrCallback::Builders(graph_builders) => {
2220 let builder = graph_builders.entry(input_location_id).or_default();
2221 builder.add_dfir(
2222 parse_quote! {
2223 #sort_ident = #input_ident -> sort();
2224 },
2225 None,
2226 Some(&next_stmt_id.to_string()),
2227 );
2228 }
2229 BuildersOrCallback::Callback(_, node_callback) => {
2230 node_callback(self, next_stmt_id);
2231 }
2232 }
2233
2234 *next_stmt_id += 1;
2235
2236 (sort_ident, input_location_id)
2237 }
2238
2239 HydroNode::DeferTick { input, .. } => {
2240 let (input_ident, input_location_id) =
2241 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2242
2243 let defer_tick_ident =
2244 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2245
2246 match builders_or_callback {
2247 BuildersOrCallback::Builders(graph_builders) => {
2248 let builder = graph_builders.entry(input_location_id).or_default();
2249 builder.add_dfir(
2250 parse_quote! {
2251 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2252 },
2253 None,
2254 Some(&next_stmt_id.to_string()),
2255 );
2256 }
2257 BuildersOrCallback::Callback(_, node_callback) => {
2258 node_callback(self, next_stmt_id);
2259 }
2260 }
2261
2262 *next_stmt_id += 1;
2263
2264 (defer_tick_ident, input_location_id)
2265 }
2266
2267 HydroNode::Enumerate {
2268 is_static, input, ..
2269 } => {
2270 let (input_ident, input_location_id) =
2271 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2272
2273 let enumerate_ident =
2274 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2275
2276 match builders_or_callback {
2277 BuildersOrCallback::Builders(graph_builders) => {
2278 let builder = graph_builders.entry(input_location_id).or_default();
2279 let lifetime = if *is_static {
2280 quote!('static)
2281 } else {
2282 quote!('tick)
2283 };
2284 builder.add_dfir(
2285 parse_quote! {
2286 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2287 },
2288 None,
2289 Some(&next_stmt_id.to_string()),
2290 );
2291 }
2292 BuildersOrCallback::Callback(_, node_callback) => {
2293 node_callback(self, next_stmt_id);
2294 }
2295 }
2296
2297 *next_stmt_id += 1;
2298
2299 (enumerate_ident, input_location_id)
2300 }
2301
2302 HydroNode::Inspect { f, input, .. } => {
2303 let (input_ident, input_location_id) =
2304 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2305
2306 let inspect_ident =
2307 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2308
2309 match builders_or_callback {
2310 BuildersOrCallback::Builders(graph_builders) => {
2311 let builder = graph_builders.entry(input_location_id).or_default();
2312 builder.add_dfir(
2313 parse_quote! {
2314 #inspect_ident = #input_ident -> inspect(#f);
2315 },
2316 None,
2317 Some(&next_stmt_id.to_string()),
2318 );
2319 }
2320 BuildersOrCallback::Callback(_, node_callback) => {
2321 node_callback(self, next_stmt_id);
2322 }
2323 }
2324
2325 *next_stmt_id += 1;
2326
2327 (inspect_ident, input_location_id)
2328 }
2329
2330 HydroNode::Unique { input, .. } => {
2331 let (input_ident, input_location_id) =
2332 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2333
2334 let unique_ident =
2335 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2336
2337 match builders_or_callback {
2338 BuildersOrCallback::Builders(graph_builders) => {
2339 let builder = graph_builders.entry(input_location_id).or_default();
2340 builder.add_dfir(
2341 parse_quote! {
2342 #unique_ident = #input_ident -> unique::<'tick>();
2343 },
2344 None,
2345 Some(&next_stmt_id.to_string()),
2346 );
2347 }
2348 BuildersOrCallback::Callback(_, node_callback) => {
2349 node_callback(self, next_stmt_id);
2350 }
2351 }
2352
2353 *next_stmt_id += 1;
2354
2355 (unique_ident, input_location_id)
2356 }
2357
2358 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2359 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2360 parse_quote!(fold)
2361 } else if matches!(self, HydroNode::Scan { .. }) {
2362 parse_quote!(scan)
2363 } else {
2364 parse_quote!(fold_keyed)
2365 };
2366
2367 let (HydroNode::Fold {
2368 init, acc, input, ..
2369 }
2370 | HydroNode::FoldKeyed {
2371 init, acc, input, ..
2372 }
2373 | HydroNode::Scan {
2374 init, acc, input, ..
2375 }) = self
2376 else {
2377 unreachable!()
2378 };
2379
2380 let (input, lifetime) =
2381 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2382 (input, quote!('static))
2383 } else {
2384 (input, quote!('tick))
2385 };
2386
2387 let (input_ident, input_location_id) =
2388 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2389
2390 let fold_ident =
2391 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2392
2393 match builders_or_callback {
2394 BuildersOrCallback::Builders(graph_builders) => {
2395 let builder = graph_builders.entry(input_location_id).or_default();
2396 builder.add_dfir(
2397 parse_quote! {
2398 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2399 },
2400 None,
2401 Some(&next_stmt_id.to_string()),
2402 );
2403 }
2404 BuildersOrCallback::Callback(_, node_callback) => {
2405 node_callback(self, next_stmt_id);
2406 }
2407 }
2408
2409 *next_stmt_id += 1;
2410
2411 (fold_ident, input_location_id)
2412 }
2413
2414 HydroNode::ReduceKeyedWatermark {
2415 f,
2416 input,
2417 watermark,
2418 ..
2419 } => {
2420 let (input, lifetime) =
2421 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2422 (input, quote!('static))
2423 } else {
2424 (input, quote!('tick))
2425 };
2426
2427 let (input_ident, input_location_id) =
2428 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2429
2430 let (watermark_ident, watermark_location_id) =
2431 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2432
2433 let chain_ident = syn::Ident::new(
2434 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2435 Span::call_site(),
2436 );
2437
2438 let fold_ident =
2439 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2440
2441 match builders_or_callback {
2442 BuildersOrCallback::Builders(graph_builders) => {
2443 assert_eq!(
2444 input_location_id, watermark_location_id,
2445 "ReduceKeyedWatermark inputs must be in the same location"
2446 );
2447
2448 let builder = graph_builders.entry(input_location_id).or_default();
2449 builder.add_dfir(
2454 parse_quote! {
2455 #chain_ident = chain();
2456 #input_ident
2457 -> map(|x| (Some(x), None))
2458 -> [0]#chain_ident;
2459 #watermark_ident
2460 -> map(|watermark| (None, Some(watermark)))
2461 -> [1]#chain_ident;
2462
2463 #fold_ident = #chain_ident
2464 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2465 let __reduce_keyed_fn = #f;
2466 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2467 if let Some((k, v)) = opt_payload {
2468 if let Some(curr_watermark) = *opt_curr_watermark {
2469 if k <= curr_watermark {
2470 return;
2471 }
2472 }
2473 match map.entry(k) {
2474 ::std::collections::hash_map::Entry::Vacant(e) => {
2475 e.insert(v);
2476 }
2477 ::std::collections::hash_map::Entry::Occupied(mut e) => {
2478 __reduce_keyed_fn(e.get_mut(), v);
2479 }
2480 }
2481 } else {
2482 let watermark = opt_watermark.unwrap();
2483 if let Some(curr_watermark) = *opt_curr_watermark {
2484 if watermark <= curr_watermark {
2485 return;
2486 }
2487 }
2488 *opt_curr_watermark = opt_watermark;
2489 map.retain(|k, _| *k > watermark);
2490 }
2491 }
2492 })
2493 -> flat_map(|(map, _curr_watermark)| map);
2494 },
2495 None,
2496 Some(&next_stmt_id.to_string()),
2497 );
2498 }
2499 BuildersOrCallback::Callback(_, node_callback) => {
2500 node_callback(self, next_stmt_id);
2501 }
2502 }
2503
2504 *next_stmt_id += 1;
2505
2506 (fold_ident, input_location_id)
2507 }
2508
2509 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2510 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2511 parse_quote!(reduce)
2512 } else {
2513 parse_quote!(reduce_keyed)
2514 };
2515
2516 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2517 self
2518 else {
2519 unreachable!()
2520 };
2521
2522 let (input, lifetime) =
2523 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2524 (input, quote!('static))
2525 } else {
2526 (input, quote!('tick))
2527 };
2528
2529 let (input_ident, input_location_id) =
2530 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2531
2532 let reduce_ident =
2533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535 match builders_or_callback {
2536 BuildersOrCallback::Builders(graph_builders) => {
2537 let builder = graph_builders.entry(input_location_id).or_default();
2538 builder.add_dfir(
2539 parse_quote! {
2540 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2541 },
2542 None,
2543 Some(&next_stmt_id.to_string()),
2544 );
2545 }
2546 BuildersOrCallback::Callback(_, node_callback) => {
2547 node_callback(self, next_stmt_id);
2548 }
2549 }
2550
2551 *next_stmt_id += 1;
2552
2553 (reduce_ident, input_location_id)
2554 }
2555
2556 HydroNode::Network {
2557 serialize_fn: serialize_pipeline,
2558 instantiate_fn,
2559 deserialize_fn: deserialize_pipeline,
2560 input,
2561 metadata,
2562 ..
2563 } => {
2564 let (input_ident, input_location_id) =
2565 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2566
2567 let to_id = metadata.location_kind.root().raw_id();
2568
2569 let receiver_stream_ident =
2570 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2571
2572 match builders_or_callback {
2573 BuildersOrCallback::Builders(graph_builders) => {
2574 let (sink_expr, source_expr) = match instantiate_fn {
2575 DebugInstantiate::Building => (
2576 syn::parse_quote!(DUMMY_SINK),
2577 syn::parse_quote!(DUMMY_SOURCE),
2578 ),
2579
2580 DebugInstantiate::Finalized(finalized) => {
2581 (finalized.sink.clone(), finalized.source.clone())
2582 }
2583 };
2584
2585 let sender_builder = graph_builders.entry(input_location_id).or_default();
2586 if let Some(serialize_pipeline) = serialize_pipeline {
2587 sender_builder.add_dfir(
2588 parse_quote! {
2589 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2590 },
2591 None,
2592 Some(&format!("send{}", next_stmt_id)),
2594 );
2595 } else {
2596 sender_builder.add_dfir(
2597 parse_quote! {
2598 #input_ident -> dest_sink(#sink_expr);
2599 },
2600 None,
2601 Some(&format!("send{}", next_stmt_id)),
2602 );
2603 }
2604
2605 let receiver_builder = graph_builders.entry(to_id).or_default();
2606 if let Some(deserialize_pipeline) = deserialize_pipeline {
2607 receiver_builder.add_dfir(parse_quote! {
2608 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2609 }, None, Some(&format!("recv{}", next_stmt_id)));
2610 } else {
2611 receiver_builder.add_dfir(
2612 parse_quote! {
2613 #receiver_stream_ident = source_stream(#source_expr);
2614 },
2615 None,
2616 Some(&format!("recv{}", next_stmt_id)),
2617 );
2618 }
2619 }
2620 BuildersOrCallback::Callback(_, node_callback) => {
2621 node_callback(self, next_stmt_id);
2622 }
2623 }
2624
2625 *next_stmt_id += 1;
2626
2627 (receiver_stream_ident, to_id)
2628 }
2629
2630 HydroNode::ExternalInput {
2631 instantiate_fn,
2632 deserialize_fn: deserialize_pipeline,
2633 metadata,
2634 ..
2635 } => {
2636 let to_id = metadata.location_kind.root().raw_id();
2637
2638 let receiver_stream_ident =
2639 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2640
2641 match builders_or_callback {
2642 BuildersOrCallback::Builders(graph_builders) => {
2643 let (_, source_expr) = match instantiate_fn {
2644 DebugInstantiate::Building => (
2645 syn::parse_quote!(DUMMY_SINK),
2646 syn::parse_quote!(DUMMY_SOURCE),
2647 ),
2648
2649 DebugInstantiate::Finalized(finalized) => {
2650 (finalized.sink.clone(), finalized.source.clone())
2651 }
2652 };
2653
2654 let receiver_builder = graph_builders.entry(to_id).or_default();
2655 if let Some(deserialize_pipeline) = deserialize_pipeline {
2656 receiver_builder.add_dfir(parse_quote! {
2657 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2658 }, None, Some(&format!("recv{}", next_stmt_id)));
2659 } else {
2660 receiver_builder.add_dfir(
2661 parse_quote! {
2662 #receiver_stream_ident = source_stream(#source_expr);
2663 },
2664 None,
2665 Some(&format!("recv{}", next_stmt_id)),
2666 );
2667 }
2668 }
2669 BuildersOrCallback::Callback(_, node_callback) => {
2670 node_callback(self, next_stmt_id);
2671 }
2672 }
2673
2674 *next_stmt_id += 1;
2675
2676 (receiver_stream_ident, to_id)
2677 }
2678
2679 HydroNode::Counter {
2680 tag,
2681 duration,
2682 prefix,
2683 input,
2684 ..
2685 } => {
2686 let (input_ident, input_location_id) =
2687 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2688
2689 let counter_ident =
2690 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2691
2692 match builders_or_callback {
2693 BuildersOrCallback::Builders(graph_builders) => {
2694 let builder = graph_builders.entry(input_location_id).or_default();
2695 builder.add_dfir(
2696 parse_quote! {
2697 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
2698 },
2699 None,
2700 Some(&next_stmt_id.to_string()),
2701 );
2702 }
2703 BuildersOrCallback::Callback(_, node_callback) => {
2704 node_callback(self, next_stmt_id);
2705 }
2706 }
2707
2708 *next_stmt_id += 1;
2709
2710 (counter_ident, input_location_id)
2711 }
2712 }
2713 }
2714
2715 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2716 match self {
2717 HydroNode::Placeholder => {
2718 panic!()
2719 }
2720 HydroNode::Source { source, .. } => match source {
2721 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2722 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2723 },
2724 HydroNode::CycleSource { .. }
2725 | HydroNode::Tee { .. }
2726 | HydroNode::Persist { .. }
2727 | HydroNode::Unpersist { .. }
2728 | HydroNode::Delta { .. }
2729 | HydroNode::Chain { .. }
2730 | HydroNode::ChainFirst { .. }
2731 | HydroNode::CrossProduct { .. }
2732 | HydroNode::CrossSingleton { .. }
2733 | HydroNode::ResolveFutures { .. }
2734 | HydroNode::ResolveFuturesOrdered { .. }
2735 | HydroNode::Join { .. }
2736 | HydroNode::Difference { .. }
2737 | HydroNode::AntiJoin { .. }
2738 | HydroNode::DeferTick { .. }
2739 | HydroNode::Enumerate { .. }
2740 | HydroNode::Unique { .. }
2741 | HydroNode::Sort { .. } => {}
2742 HydroNode::Map { f, .. }
2743 | HydroNode::FlatMap { f, .. }
2744 | HydroNode::Filter { f, .. }
2745 | HydroNode::FilterMap { f, .. }
2746 | HydroNode::Inspect { f, .. }
2747 | HydroNode::Reduce { f, .. }
2748 | HydroNode::ReduceKeyed { f, .. }
2749 | HydroNode::ReduceKeyedWatermark { f, .. } => {
2750 transform(f);
2751 }
2752 HydroNode::Fold { init, acc, .. }
2753 | HydroNode::Scan { init, acc, .. }
2754 | HydroNode::FoldKeyed { init, acc, .. } => {
2755 transform(init);
2756 transform(acc);
2757 }
2758 HydroNode::Network {
2759 serialize_fn,
2760 deserialize_fn,
2761 ..
2762 } => {
2763 if let Some(serialize_fn) = serialize_fn {
2764 transform(serialize_fn);
2765 }
2766 if let Some(deserialize_fn) = deserialize_fn {
2767 transform(deserialize_fn);
2768 }
2769 }
2770 HydroNode::ExternalInput { deserialize_fn, .. } => {
2771 if let Some(deserialize_fn) = deserialize_fn {
2772 transform(deserialize_fn);
2773 }
2774 }
2775 HydroNode::Counter { duration, .. } => {
2776 transform(duration);
2777 }
2778 }
2779 }
2780
2781 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
2782 &self.metadata().op
2783 }
2784
2785 pub fn metadata(&self) -> &HydroIrMetadata {
2786 match self {
2787 HydroNode::Placeholder => {
2788 panic!()
2789 }
2790 HydroNode::Source { metadata, .. } => metadata,
2791 HydroNode::CycleSource { metadata, .. } => metadata,
2792 HydroNode::Tee { metadata, .. } => metadata,
2793 HydroNode::Persist { metadata, .. } => metadata,
2794 HydroNode::Unpersist { metadata, .. } => metadata,
2795 HydroNode::Delta { metadata, .. } => metadata,
2796 HydroNode::Chain { metadata, .. } => metadata,
2797 HydroNode::ChainFirst { metadata, .. } => metadata,
2798 HydroNode::CrossProduct { metadata, .. } => metadata,
2799 HydroNode::CrossSingleton { metadata, .. } => metadata,
2800 HydroNode::Join { metadata, .. } => metadata,
2801 HydroNode::Difference { metadata, .. } => metadata,
2802 HydroNode::AntiJoin { metadata, .. } => metadata,
2803 HydroNode::ResolveFutures { metadata, .. } => metadata,
2804 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2805 HydroNode::Map { metadata, .. } => metadata,
2806 HydroNode::FlatMap { metadata, .. } => metadata,
2807 HydroNode::Filter { metadata, .. } => metadata,
2808 HydroNode::FilterMap { metadata, .. } => metadata,
2809 HydroNode::DeferTick { metadata, .. } => metadata,
2810 HydroNode::Enumerate { metadata, .. } => metadata,
2811 HydroNode::Inspect { metadata, .. } => metadata,
2812 HydroNode::Unique { metadata, .. } => metadata,
2813 HydroNode::Sort { metadata, .. } => metadata,
2814 HydroNode::Scan { metadata, .. } => metadata,
2815 HydroNode::Fold { metadata, .. } => metadata,
2816 HydroNode::FoldKeyed { metadata, .. } => metadata,
2817 HydroNode::Reduce { metadata, .. } => metadata,
2818 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2819 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2820 HydroNode::ExternalInput { metadata, .. } => metadata,
2821 HydroNode::Network { metadata, .. } => metadata,
2822 HydroNode::Counter { metadata, .. } => metadata,
2823 }
2824 }
2825
2826 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
2827 &mut self.metadata_mut().op
2828 }
2829
2830 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2831 match self {
2832 HydroNode::Placeholder => {
2833 panic!()
2834 }
2835 HydroNode::Source { metadata, .. } => metadata,
2836 HydroNode::CycleSource { metadata, .. } => metadata,
2837 HydroNode::Tee { metadata, .. } => metadata,
2838 HydroNode::Persist { metadata, .. } => metadata,
2839 HydroNode::Unpersist { metadata, .. } => metadata,
2840 HydroNode::Delta { metadata, .. } => metadata,
2841 HydroNode::Chain { metadata, .. } => metadata,
2842 HydroNode::ChainFirst { metadata, .. } => metadata,
2843 HydroNode::CrossProduct { metadata, .. } => metadata,
2844 HydroNode::CrossSingleton { metadata, .. } => metadata,
2845 HydroNode::Join { metadata, .. } => metadata,
2846 HydroNode::Difference { metadata, .. } => metadata,
2847 HydroNode::AntiJoin { metadata, .. } => metadata,
2848 HydroNode::ResolveFutures { metadata, .. } => metadata,
2849 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2850 HydroNode::Map { metadata, .. } => metadata,
2851 HydroNode::FlatMap { metadata, .. } => metadata,
2852 HydroNode::Filter { metadata, .. } => metadata,
2853 HydroNode::FilterMap { metadata, .. } => metadata,
2854 HydroNode::DeferTick { metadata, .. } => metadata,
2855 HydroNode::Enumerate { metadata, .. } => metadata,
2856 HydroNode::Inspect { metadata, .. } => metadata,
2857 HydroNode::Unique { metadata, .. } => metadata,
2858 HydroNode::Sort { metadata, .. } => metadata,
2859 HydroNode::Scan { metadata, .. } => metadata,
2860 HydroNode::Fold { metadata, .. } => metadata,
2861 HydroNode::FoldKeyed { metadata, .. } => metadata,
2862 HydroNode::Reduce { metadata, .. } => metadata,
2863 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2864 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2865 HydroNode::ExternalInput { metadata, .. } => metadata,
2866 HydroNode::Network { metadata, .. } => metadata,
2867 HydroNode::Counter { metadata, .. } => metadata,
2868 }
2869 }
2870
2871 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2872 match self {
2873 HydroNode::Placeholder => {
2874 panic!()
2875 }
2876 HydroNode::Source { .. }
2877 | HydroNode::ExternalInput { .. }
2878 | HydroNode::CycleSource { .. } | HydroNode::Tee { .. } => {
2880 vec![]
2881 }
2882 HydroNode::Persist { inner, .. }
2883 | HydroNode::Unpersist { inner, .. }
2884 | HydroNode::Delta { inner, .. } => {
2885 vec![inner.metadata()]
2886 }
2887 HydroNode::Chain { first, second, .. } => {
2888 vec![first.metadata(), second.metadata()]
2889 }
2890 HydroNode::ChainFirst { first, second, .. } => {
2891 vec![first.metadata(), second.metadata()]
2892 }
2893 HydroNode::CrossProduct { left, right, .. }
2894 | HydroNode::CrossSingleton { left, right, .. }
2895 | HydroNode::Join { left, right, .. } => {
2896 vec![left.metadata(), right.metadata()]
2897 }
2898 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2899 vec![pos.metadata(), neg.metadata()]
2900 }
2901 HydroNode::Map { input, .. }
2902 | HydroNode::FlatMap { input, .. }
2903 | HydroNode::Filter { input, .. }
2904 | HydroNode::FilterMap { input, .. }
2905 | HydroNode::Sort { input, .. }
2906 | HydroNode::DeferTick { input, .. }
2907 | HydroNode::Enumerate { input, .. }
2908 | HydroNode::Inspect { input, .. }
2909 | HydroNode::Unique { input, .. }
2910 | HydroNode::Network { input, .. }
2911 | HydroNode::Counter { input, .. }
2912 | HydroNode::ResolveFutures { input, .. }
2913 | HydroNode::ResolveFuturesOrdered { input, .. } => {
2914 vec![input.metadata()]
2915 }
2916 HydroNode::Fold { input, .. }
2917 | HydroNode::FoldKeyed { input, .. }
2918 | HydroNode::Reduce { input, .. }
2919 | HydroNode::ReduceKeyed { input, .. }
2920 | HydroNode::Scan { input, .. } => {
2921 if let HydroNode::Persist { inner, .. } = input.as_ref() {
2923 vec![inner.metadata()]
2924 } else {
2925 vec![input.metadata()]
2926 }
2927 }
2928 HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
2929 if let HydroNode::Persist { inner, .. } = input.as_ref() {
2931 vec![inner.metadata(), watermark.metadata()]
2932 } else {
2933 vec![input.metadata(), watermark.metadata()]
2934 }
2935 }
2936 }
2937 }
2938
2939 pub fn print_root(&self) -> String {
2940 match self {
2941 HydroNode::Placeholder => {
2942 panic!()
2943 }
2944 HydroNode::Source { source, .. } => format!("Source({:?})", source),
2945 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2946 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2947 HydroNode::Persist { .. } => "Persist()".to_string(),
2948 HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2949 HydroNode::Delta { .. } => "Delta()".to_string(),
2950 HydroNode::Chain { first, second, .. } => {
2951 format!("Chain({}, {})", first.print_root(), second.print_root())
2952 }
2953 HydroNode::ChainFirst { first, second, .. } => {
2954 format!(
2955 "ChainFirst({}, {})",
2956 first.print_root(),
2957 second.print_root()
2958 )
2959 }
2960 HydroNode::CrossProduct { left, right, .. } => {
2961 format!(
2962 "CrossProduct({}, {})",
2963 left.print_root(),
2964 right.print_root()
2965 )
2966 }
2967 HydroNode::CrossSingleton { left, right, .. } => {
2968 format!(
2969 "CrossSingleton({}, {})",
2970 left.print_root(),
2971 right.print_root()
2972 )
2973 }
2974 HydroNode::Join { left, right, .. } => {
2975 format!("Join({}, {})", left.print_root(), right.print_root())
2976 }
2977 HydroNode::Difference { pos, neg, .. } => {
2978 format!("Difference({}, {})", pos.print_root(), neg.print_root())
2979 }
2980 HydroNode::AntiJoin { pos, neg, .. } => {
2981 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2982 }
2983 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2984 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2985 HydroNode::Map { f, .. } => format!("Map({:?})", f),
2986 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2987 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2988 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2989 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2990 HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2991 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2992 HydroNode::Unique { .. } => "Unique()".to_string(),
2993 HydroNode::Sort { .. } => "Sort()".to_string(),
2994 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2995 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
2996 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2997 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2998 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2999 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3000 HydroNode::Network { .. } => "Network()".to_string(),
3001 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3002 HydroNode::Counter { tag, duration, .. } => {
3003 format!("Counter({:?}, {:?})", tag, duration)
3004 }
3005 }
3006 }
3007}
3008
3009#[cfg(feature = "build")]
3010fn instantiate_network<'a, D>(
3011 from_location: &LocationId,
3012 to_location: &LocationId,
3013 processes: &HashMap<usize, D::Process>,
3014 clusters: &HashMap<usize, D::Cluster>,
3015 compile_env: &D::CompileEnv,
3016) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3017where
3018 D: Deploy<'a>,
3019{
3020 let ((sink, source), connect_fn) = match (from_location, to_location) {
3021 (LocationId::Process(from), LocationId::Process(to)) => {
3022 let from_node = processes
3023 .get(from)
3024 .unwrap_or_else(|| {
3025 panic!("A process used in the graph was not instantiated: {}", from)
3026 })
3027 .clone();
3028 let to_node = processes
3029 .get(to)
3030 .unwrap_or_else(|| {
3031 panic!("A process used in the graph was not instantiated: {}", to)
3032 })
3033 .clone();
3034
3035 let sink_port = D::allocate_process_port(&from_node);
3036 let source_port = D::allocate_process_port(&to_node);
3037
3038 (
3039 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3040 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3041 )
3042 }
3043 (LocationId::Process(from), LocationId::Cluster(to)) => {
3044 let from_node = processes
3045 .get(from)
3046 .unwrap_or_else(|| {
3047 panic!("A process used in the graph was not instantiated: {}", from)
3048 })
3049 .clone();
3050 let to_node = clusters
3051 .get(to)
3052 .unwrap_or_else(|| {
3053 panic!("A cluster used in the graph was not instantiated: {}", to)
3054 })
3055 .clone();
3056
3057 let sink_port = D::allocate_process_port(&from_node);
3058 let source_port = D::allocate_cluster_port(&to_node);
3059
3060 (
3061 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3062 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3063 )
3064 }
3065 (LocationId::Cluster(from), LocationId::Process(to)) => {
3066 let from_node = clusters
3067 .get(from)
3068 .unwrap_or_else(|| {
3069 panic!("A cluster used in the graph was not instantiated: {}", from)
3070 })
3071 .clone();
3072 let to_node = processes
3073 .get(to)
3074 .unwrap_or_else(|| {
3075 panic!("A process used in the graph was not instantiated: {}", to)
3076 })
3077 .clone();
3078
3079 let sink_port = D::allocate_cluster_port(&from_node);
3080 let source_port = D::allocate_process_port(&to_node);
3081
3082 (
3083 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3084 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3085 )
3086 }
3087 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3088 let from_node = clusters
3089 .get(from)
3090 .unwrap_or_else(|| {
3091 panic!("A cluster used in the graph was not instantiated: {}", from)
3092 })
3093 .clone();
3094 let to_node = clusters
3095 .get(to)
3096 .unwrap_or_else(|| {
3097 panic!("A cluster used in the graph was not instantiated: {}", to)
3098 })
3099 .clone();
3100
3101 let sink_port = D::allocate_cluster_port(&from_node);
3102 let source_port = D::allocate_cluster_port(&to_node);
3103
3104 (
3105 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3106 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3107 )
3108 }
3109 (LocationId::Tick(_, _), _) => panic!(),
3110 (_, LocationId::Tick(_, _)) => panic!(),
3111 };
3112 (sink, source, connect_fn)
3113}
3114
3115#[cfg(test)]
3116mod test {
3117 use std::mem::size_of;
3118
3119 use stageleft::{QuotedWithContext, q};
3120
3121 use super::*;
3122
3123 #[test]
3124 fn hydro_node_size() {
3125 assert_eq!(size_of::<HydroNode>(), 248);
3126 }
3127
3128 #[test]
3129 fn hydro_root_size() {
3130 assert_eq!(size_of::<HydroRoot>(), 176);
3131 }
3132
3133 #[test]
3134 fn test_simplify_q_macro_basic() {
3135 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3137 let result = simplify_q_macro(simple_expr.clone());
3138 assert_eq!(result, simple_expr);
3139 }
3140
3141 #[test]
3142 fn test_simplify_q_macro_actual_stageleft_call() {
3143 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3145 let result = simplify_q_macro(stageleft_call);
3146 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3149 }
3150
3151 #[test]
3152 fn test_closure_no_pipe_at_start() {
3153 let stageleft_call = q!({
3155 let foo = 123;
3156 move |b: usize| b + foo
3157 })
3158 .splice_fn1_ctx(&());
3159 let result = simplify_q_macro(stageleft_call);
3160 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3161 }
3162}