1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::NetworkHint;
25#[cfg(stageleft_runtime)]
26use crate::backtrace::Backtrace;
27#[cfg(feature = "build")]
28use crate::deploy::{Deploy, RegisterPort};
29use crate::location::LocationId;
30
31#[derive(Clone, Hash)]
35pub struct DebugExpr(pub Box<syn::Expr>);
36
37impl From<syn::Expr> for DebugExpr {
38 fn from(expr: syn::Expr) -> Self {
39 Self(Box::new(expr))
40 }
41}
42
43impl Deref for DebugExpr {
44 type Target = syn::Expr;
45
46 fn deref(&self) -> &Self::Target {
47 &self.0
48 }
49}
50
51impl ToTokens for DebugExpr {
52 fn to_tokens(&self, tokens: &mut TokenStream) {
53 self.0.to_tokens(tokens);
54 }
55}
56
57impl Debug for DebugExpr {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 write!(f, "{}", self.0.to_token_stream())
60 }
61}
62
63impl Display for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 let original = self.0.as_ref().clone();
66 let simplified = simplify_q_macro(original);
67
68 write!(f, "q!({})", quote::quote!(#simplified))
71 }
72}
73
74fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
76 let mut simplifier = QMacroSimplifier::new();
79 simplifier.visit_expr_mut(&mut expr);
80
81 if let Some(simplified) = simplifier.simplified_result {
83 simplified
84 } else {
85 expr
86 }
87}
88
89#[derive(Default)]
91pub struct QMacroSimplifier {
92 pub simplified_result: Option<syn::Expr>,
93}
94
95impl QMacroSimplifier {
96 pub fn new() -> Self {
97 Self::default()
98 }
99}
100
101impl VisitMut for QMacroSimplifier {
102 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
103 if self.simplified_result.is_some() {
105 return;
106 }
107
108 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
109 && self.is_stageleft_runtime_support_call(&path_expr.path)
111 && let Some(closure) = self.extract_closure_from_args(&call.args)
113 {
114 self.simplified_result = Some(closure);
115 return;
116 }
117
118 syn::visit_mut::visit_expr_mut(self, expr);
121 }
122}
123
124impl QMacroSimplifier {
125 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
126 if let Some(last_segment) = path.segments.last() {
128 let fn_name = last_segment.ident.to_string();
129 fn_name.contains("_type_hint")
131 && path.segments.len() > 2
132 && path.segments[0].ident == "stageleft"
133 && path.segments[1].ident == "runtime_support"
134 } else {
135 false
136 }
137 }
138
139 fn extract_closure_from_args(
140 &self,
141 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
142 ) -> Option<syn::Expr> {
143 for arg in args {
145 if let syn::Expr::Closure(_) = arg {
146 return Some(arg.clone());
147 }
148 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
150 return Some(closure_expr);
151 }
152 }
153 None
154 }
155
156 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
157 let mut visitor = ClosureFinder {
158 found_closure: None,
159 prefer_inner_blocks: true,
160 };
161 visitor.visit_expr(expr);
162 visitor.found_closure
163 }
164}
165
166struct ClosureFinder {
168 found_closure: Option<syn::Expr>,
169 prefer_inner_blocks: bool,
170}
171
172impl<'ast> Visit<'ast> for ClosureFinder {
173 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
174 if self.found_closure.is_some() {
176 return;
177 }
178
179 match expr {
180 syn::Expr::Closure(_) => {
181 self.found_closure = Some(expr.clone());
182 }
183 syn::Expr::Block(block) if self.prefer_inner_blocks => {
184 for stmt in &block.block.stmts {
186 if let syn::Stmt::Expr(stmt_expr, _) = stmt
187 && let syn::Expr::Block(_) = stmt_expr
188 {
189 let mut inner_visitor = ClosureFinder {
191 found_closure: None,
192 prefer_inner_blocks: false, };
194 inner_visitor.visit_expr(stmt_expr);
195 if inner_visitor.found_closure.is_some() {
196 self.found_closure = Some(stmt_expr.clone());
198 return;
199 }
200 }
201 }
202
203 visit::visit_expr(self, expr);
205
206 if self.found_closure.is_some() {
209 }
211 }
212 _ => {
213 visit::visit_expr(self, expr);
215 }
216 }
217 }
218}
219
220#[derive(Clone, Hash)]
224pub struct DebugType(pub Box<syn::Type>);
225
226impl From<syn::Type> for DebugType {
227 fn from(t: syn::Type) -> Self {
228 Self(Box::new(t))
229 }
230}
231
232impl Deref for DebugType {
233 type Target = syn::Type;
234
235 fn deref(&self) -> &Self::Target {
236 &self.0
237 }
238}
239
240impl ToTokens for DebugType {
241 fn to_tokens(&self, tokens: &mut TokenStream) {
242 self.0.to_tokens(tokens);
243 }
244}
245
246impl Debug for DebugType {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 write!(f, "{}", self.0.to_token_stream())
249 }
250}
251
252pub enum DebugInstantiate {
253 Building,
254 Finalized(Box<DebugInstantiateFinalized>),
255}
256
257#[cfg_attr(
258 not(feature = "build"),
259 expect(
260 dead_code,
261 reason = "sink, source unused without `feature = \"build\"`."
262 )
263)]
264pub struct DebugInstantiateFinalized {
265 sink: syn::Expr,
266 source: syn::Expr,
267 connect_fn: Option<Box<dyn FnOnce()>>,
268}
269
270impl From<DebugInstantiateFinalized> for DebugInstantiate {
271 fn from(f: DebugInstantiateFinalized) -> Self {
272 Self::Finalized(Box::new(f))
273 }
274}
275
276impl Debug for DebugInstantiate {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 write!(f, "<network instantiate>")
279 }
280}
281
282impl Hash for DebugInstantiate {
283 fn hash<H: Hasher>(&self, _state: &mut H) {
284 }
286}
287
288impl Clone for DebugInstantiate {
289 fn clone(&self) -> Self {
290 match self {
291 DebugInstantiate::Building => DebugInstantiate::Building,
292 DebugInstantiate::Finalized(_) => {
293 panic!("DebugInstantiate::Finalized should not be cloned")
294 }
295 }
296 }
297}
298
299#[derive(Debug, Hash, Clone)]
301pub enum HydroSource {
302 Stream(DebugExpr),
303 ExternalNetwork(),
304 Iter(DebugExpr),
305 Spin(),
306}
307
308#[cfg(feature = "build")]
309pub enum BuildersOrCallback<'a, L, N>
310where
311 L: FnMut(&mut HydroLeaf, &mut usize),
312 N: FnMut(&mut HydroNode, &mut usize),
313{
314 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
315 Callback(L, N),
316}
317
318#[derive(Debug, Hash)]
322pub enum HydroLeaf {
323 ForEach {
324 f: DebugExpr,
325 input: Box<HydroNode>,
326 metadata: HydroIrMetadata,
327 },
328 SendExternal {
329 to_external_id: usize,
330 to_key: usize,
331 to_many: bool,
332 serialize_fn: Option<DebugExpr>,
333 instantiate_fn: DebugInstantiate,
334 input: Box<HydroNode>,
335 },
336 DestSink {
337 sink: DebugExpr,
338 input: Box<HydroNode>,
339 metadata: HydroIrMetadata,
340 },
341 CycleSink {
342 ident: syn::Ident,
343 input: Box<HydroNode>,
344 metadata: HydroIrMetadata,
345 },
346}
347
348impl HydroLeaf {
349 #[cfg(feature = "build")]
350 pub fn compile_network<'a, D>(
351 &mut self,
352 compile_env: &D::CompileEnv,
353 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
354 seen_tees: &mut SeenTees,
355 processes: &HashMap<usize, D::Process>,
356 clusters: &HashMap<usize, D::Cluster>,
357 externals: &HashMap<usize, D::External>,
358 ) where
359 D: Deploy<'a>,
360 {
361 self.transform_bottom_up(
362 &mut |l| {
363 if let HydroLeaf::SendExternal {
364 input,
365 to_external_id,
366 to_key,
367 to_many,
368 instantiate_fn,
369 ..
370 } = l
371 {
372 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
373 DebugInstantiate::Building => {
374 let to_node = externals
375 .get(to_external_id)
376 .unwrap_or_else(|| {
377 panic!("A external used in the graph was not instantiated: {}", to_external_id)
378 })
379 .clone();
380
381 match input.metadata().location_kind.root() {
382 LocationId::Process(process_id) => {
383 if *to_many {
384 (
385 (
386 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
387 parse_quote!(DUMMY),
388 ),
389 Box::new(|| {}) as Box<dyn FnOnce()>,
390 )
391 } else {
392 let from_node = processes
393 .get(process_id)
394 .unwrap_or_else(|| {
395 panic!("A process used in the graph was not instantiated: {}", process_id)
396 })
397 .clone();
398
399 let sink_port = D::allocate_process_port(&from_node);
400 let source_port = D::allocate_external_port(&to_node);
401
402 to_node.register(*to_key, source_port.clone());
403
404 (
405 (
406 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
407 parse_quote!(DUMMY),
408 ),
409 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
410 )
411 }
412 }
413 LocationId::Cluster(_) => todo!(),
414 _ => panic!()
415 }
416 },
417
418 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
419 };
420
421 *instantiate_fn = DebugInstantiateFinalized {
422 sink: sink_expr,
423 source: source_expr,
424 connect_fn: Some(connect_fn),
425 }
426 .into();
427 }
428 },
429 &mut |n| {
430 if let HydroNode::Network {
431 input,
432 instantiate_fn,
433 metadata,
434 ..
435 } = n
436 {
437 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
438 DebugInstantiate::Building => instantiate_network::<D>(
439 input.metadata().location_kind.root(),
440 metadata.location_kind.root(),
441 processes,
442 clusters,
443 compile_env,
444 ),
445
446 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
447 };
448
449 *instantiate_fn = DebugInstantiateFinalized {
450 sink: sink_expr,
451 source: source_expr,
452 connect_fn: Some(connect_fn),
453 }
454 .into();
455 } else if let HydroNode::ExternalInput {
456 from_external_id,
457 from_key,
458 from_many,
459 codec_type,
460 port_hint,
461 instantiate_fn,
462 metadata,
463 ..
464 } = n
465 {
466 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
467 DebugInstantiate::Building => {
468 let from_node = externals
469 .get(from_external_id)
470 .unwrap_or_else(|| {
471 panic!(
472 "A external used in the graph was not instantiated: {}",
473 from_external_id
474 )
475 })
476 .clone();
477
478 match metadata.location_kind.root() {
479 LocationId::Process(process_id) => {
480 let to_node = processes
481 .get(process_id)
482 .unwrap_or_else(|| {
483 panic!("A process used in the graph was not instantiated: {}", process_id)
484 })
485 .clone();
486
487 let sink_port = D::allocate_external_port(&from_node);
488 let source_port = D::allocate_process_port(&to_node);
489
490 from_node.register(*from_key, sink_port.clone());
491
492 (
493 (
494 parse_quote!(DUMMY),
495 if *from_many {
496 D::e2o_many_source(
497 compile_env,
498 extra_stmts.entry(*process_id).or_default(),
499 &to_node, &source_port,
500 codec_type.0.as_ref(),
501 format!("{}_{}", *from_external_id, *from_key)
502 )
503 } else {
504 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
505 },
506 ),
507 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
508 )
509 }
510 LocationId::Cluster(_) => todo!(),
511 _ => panic!()
512 }
513 },
514
515 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
516 };
517
518 *instantiate_fn = DebugInstantiateFinalized {
519 sink: sink_expr,
520 source: source_expr,
521 connect_fn: Some(connect_fn),
522 }
523 .into();
524 }
525 },
526 seen_tees,
527 false,
528 );
529 }
530
531 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
532 self.transform_bottom_up(
533 &mut |l| {
534 if let HydroLeaf::SendExternal { instantiate_fn, .. } = l {
535 match instantiate_fn {
536 DebugInstantiate::Building => panic!("network not built"),
537
538 DebugInstantiate::Finalized(finalized) => {
539 (finalized.connect_fn.take().unwrap())();
540 }
541 }
542 }
543 },
544 &mut |n| {
545 if let HydroNode::Network { instantiate_fn, .. }
546 | HydroNode::ExternalInput { instantiate_fn, .. } = n
547 {
548 match instantiate_fn {
549 DebugInstantiate::Building => panic!("network not built"),
550
551 DebugInstantiate::Finalized(finalized) => {
552 (finalized.connect_fn.take().unwrap())();
553 }
554 }
555 }
556 },
557 seen_tees,
558 false,
559 );
560 }
561
562 pub fn transform_bottom_up(
563 &mut self,
564 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
565 transform_node: &mut impl FnMut(&mut HydroNode),
566 seen_tees: &mut SeenTees,
567 check_well_formed: bool,
568 ) {
569 self.transform_children(
570 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
571 seen_tees,
572 );
573
574 transform_leaf(self);
575 }
576
577 pub fn transform_children(
578 &mut self,
579 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
580 seen_tees: &mut SeenTees,
581 ) {
582 match self {
583 HydroLeaf::ForEach { input, .. }
584 | HydroLeaf::SendExternal { input, .. }
585 | HydroLeaf::DestSink { input, .. }
586 | HydroLeaf::CycleSink { input, .. } => {
587 transform(input, seen_tees);
588 }
589 }
590 }
591
592 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
593 match self {
594 HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
595 f: f.clone(),
596 input: Box::new(input.deep_clone(seen_tees)),
597 metadata: metadata.clone(),
598 },
599 HydroLeaf::SendExternal {
600 to_external_id,
601 to_key,
602 to_many,
603 serialize_fn,
604 instantiate_fn,
605 input,
606 } => HydroLeaf::SendExternal {
607 to_external_id: *to_external_id,
608 to_key: *to_key,
609 to_many: *to_many,
610 serialize_fn: serialize_fn.clone(),
611 instantiate_fn: instantiate_fn.clone(),
612 input: Box::new(input.deep_clone(seen_tees)),
613 },
614 HydroLeaf::DestSink {
615 sink,
616 input,
617 metadata,
618 } => HydroLeaf::DestSink {
619 sink: sink.clone(),
620 input: Box::new(input.deep_clone(seen_tees)),
621 metadata: metadata.clone(),
622 },
623 HydroLeaf::CycleSink {
624 ident,
625 input,
626 metadata,
627 } => HydroLeaf::CycleSink {
628 ident: ident.clone(),
629 input: Box::new(input.deep_clone(seen_tees)),
630 metadata: metadata.clone(),
631 },
632 }
633 }
634
635 #[cfg(feature = "build")]
636 pub fn emit(
637 &mut self,
638 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
639 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
640 next_stmt_id: &mut usize,
641 ) {
642 self.emit_core(
643 &mut BuildersOrCallback::Builders::<
644 fn(&mut HydroLeaf, &mut usize),
645 fn(&mut HydroNode, &mut usize),
646 >(graph_builders),
647 built_tees,
648 next_stmt_id,
649 );
650 }
651
652 #[cfg(feature = "build")]
653 pub fn emit_core(
654 &mut self,
655 builders_or_callback: &mut BuildersOrCallback<
656 impl FnMut(&mut HydroLeaf, &mut usize),
657 impl FnMut(&mut HydroNode, &mut usize),
658 >,
659 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
660 next_stmt_id: &mut usize,
661 ) {
662 match self {
663 HydroLeaf::ForEach { f, input, .. } => {
664 let (input_ident, input_location_id) =
665 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
666
667 match builders_or_callback {
668 BuildersOrCallback::Builders(graph_builders) => {
669 graph_builders
670 .entry(input_location_id)
671 .or_default()
672 .add_dfir(
673 parse_quote! {
674 #input_ident -> for_each(#f);
675 },
676 None,
677 Some(&next_stmt_id.to_string()),
678 );
679 }
680 BuildersOrCallback::Callback(leaf_callback, _) => {
681 leaf_callback(self, next_stmt_id);
682 }
683 }
684
685 *next_stmt_id += 1;
686 }
687
688 HydroLeaf::SendExternal {
689 serialize_fn,
690 instantiate_fn,
691 input,
692 ..
693 } => {
694 let (input_ident, input_location_id) =
695 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
696
697 match builders_or_callback {
698 BuildersOrCallback::Builders(graph_builders) => {
699 let (sink_expr, _) = match instantiate_fn {
700 DebugInstantiate::Building => (
701 syn::parse_quote!(DUMMY_SINK),
702 syn::parse_quote!(DUMMY_SOURCE),
703 ),
704
705 DebugInstantiate::Finalized(finalized) => {
706 (finalized.sink.clone(), finalized.source.clone())
707 }
708 };
709
710 let sender_builder = graph_builders.entry(input_location_id).or_default();
711 if let Some(serialize_fn) = serialize_fn {
712 sender_builder.add_dfir(
713 parse_quote! {
714 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
715 },
716 None,
717 Some(&format!("send{}", next_stmt_id)),
719 );
720 } else {
721 sender_builder.add_dfir(
722 parse_quote! {
723 #input_ident -> dest_sink(#sink_expr);
724 },
725 None,
726 Some(&format!("send{}", next_stmt_id)),
727 );
728 }
729 }
730 BuildersOrCallback::Callback(leaf_callback, _) => {
731 leaf_callback(self, next_stmt_id);
732 }
733 }
734
735 *next_stmt_id += 1;
736 }
737
738 HydroLeaf::DestSink { sink, input, .. } => {
739 let (input_ident, input_location_id) =
740 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
741
742 match builders_or_callback {
743 BuildersOrCallback::Builders(graph_builders) => {
744 graph_builders
745 .entry(input_location_id)
746 .or_default()
747 .add_dfir(
748 parse_quote! {
749 #input_ident -> dest_sink(#sink);
750 },
751 None,
752 Some(&next_stmt_id.to_string()),
753 );
754 }
755 BuildersOrCallback::Callback(leaf_callback, _) => {
756 leaf_callback(self, next_stmt_id);
757 }
758 }
759
760 *next_stmt_id += 1;
761 }
762
763 HydroLeaf::CycleSink {
764 ident,
765 input,
766 metadata,
767 ..
768 } => {
769 let (input_ident, input_location_id) =
770 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
771
772 let location_id = metadata.location_kind.root().raw_id();
773
774 match builders_or_callback {
775 BuildersOrCallback::Builders(graph_builders) => {
776 assert_eq!(
777 input_location_id, location_id,
778 "cycle_sink location mismatch"
779 );
780
781 graph_builders.entry(location_id).or_default().add_dfir(
782 parse_quote! {
783 #ident = #input_ident;
784 },
785 None,
786 None,
787 );
788 }
789 BuildersOrCallback::Callback(_, _) => {}
791 }
792 }
793 }
794 }
795
796 pub fn metadata(&self) -> &HydroIrMetadata {
797 match self {
798 HydroLeaf::ForEach { metadata, .. }
799 | HydroLeaf::DestSink { metadata, .. }
800 | HydroLeaf::CycleSink { metadata, .. } => metadata,
801 HydroLeaf::SendExternal { .. } => panic!(),
802 }
803 }
804
805 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
806 match self {
807 HydroLeaf::ForEach { metadata, .. }
808 | HydroLeaf::DestSink { metadata, .. }
809 | HydroLeaf::CycleSink { metadata, .. } => metadata,
810 HydroLeaf::SendExternal { .. } => panic!(),
811 }
812 }
813
814 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
815 match self {
816 HydroLeaf::ForEach { input, .. }
817 | HydroLeaf::SendExternal { input, .. }
818 | HydroLeaf::DestSink { input, .. }
819 | HydroLeaf::CycleSink { input, .. } => {
820 vec![input.metadata()]
821 }
822 }
823 }
824
825 pub fn print_root(&self) -> String {
826 match self {
827 HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
828 HydroLeaf::SendExternal { .. } => "SendExternal".to_string(),
829 HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
830 HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
831 }
832 }
833
834 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
835 match self {
836 HydroLeaf::ForEach { f, .. } | HydroLeaf::DestSink { sink: f, .. } => {
837 transform(f);
838 }
839 HydroLeaf::SendExternal { .. } | HydroLeaf::CycleSink { .. } => {}
840 }
841 }
842}
843
844#[cfg(feature = "build")]
845pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
846 let mut builders = BTreeMap::new();
847 let mut built_tees = HashMap::new();
848 let mut next_stmt_id = 0;
849 for leaf in ir {
850 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
851 }
852 builders
853}
854
855#[cfg(feature = "build")]
856pub fn traverse_dfir(
857 ir: &mut [HydroLeaf],
858 transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
859 transform_node: impl FnMut(&mut HydroNode, &mut usize),
860) {
861 let mut seen_tees = HashMap::new();
862 let mut next_stmt_id = 0;
863 let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
864 ir.iter_mut().for_each(|leaf| {
865 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
866 });
867}
868
869pub fn transform_bottom_up(
870 ir: &mut [HydroLeaf],
871 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
872 transform_node: &mut impl FnMut(&mut HydroNode),
873 check_well_formed: bool,
874) {
875 let mut seen_tees = HashMap::new();
876 ir.iter_mut().for_each(|leaf| {
877 leaf.transform_bottom_up(
878 transform_leaf,
879 transform_node,
880 &mut seen_tees,
881 check_well_formed,
882 );
883 });
884}
885
886pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
887 let mut seen_tees = HashMap::new();
888 ir.iter()
889 .map(|leaf| leaf.deep_clone(&mut seen_tees))
890 .collect()
891}
892
893type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
894thread_local! {
895 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
896}
897
898pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
899 PRINTED_TEES.with(|printed_tees| {
900 let mut printed_tees_mut = printed_tees.borrow_mut();
901 *printed_tees_mut = Some((0, HashMap::new()));
902 drop(printed_tees_mut);
903
904 let ret = f();
905
906 let mut printed_tees_mut = printed_tees.borrow_mut();
907 *printed_tees_mut = None;
908
909 ret
910 })
911}
912
913pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
914
915impl TeeNode {
916 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
917 Rc::as_ptr(&self.0)
918 }
919}
920
921impl Debug for TeeNode {
922 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
923 PRINTED_TEES.with(|printed_tees| {
924 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
925 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
926
927 if let Some(printed_tees_mut) = printed_tees_mut {
928 if let Some(existing) = printed_tees_mut
929 .1
930 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
931 {
932 write!(f, "<tee {}>", existing)
933 } else {
934 let next_id = printed_tees_mut.0;
935 printed_tees_mut.0 += 1;
936 printed_tees_mut
937 .1
938 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
939 drop(printed_tees_mut_borrow);
940 write!(f, "<tee {}>: ", next_id)?;
941 Debug::fmt(&self.0.borrow(), f)
942 }
943 } else {
944 drop(printed_tees_mut_borrow);
945 write!(f, "<tee>: ")?;
946 Debug::fmt(&self.0.borrow(), f)
947 }
948 })
949 }
950}
951
952impl Hash for TeeNode {
953 fn hash<H: Hasher>(&self, state: &mut H) {
954 self.0.borrow_mut().hash(state);
955 }
956}
957
958#[derive(Clone)]
959pub struct HydroIrMetadata {
960 pub location_kind: LocationId,
961 pub backtrace: Backtrace,
962 pub output_type: Option<DebugType>,
963 pub cardinality: Option<usize>,
964 pub cpu_usage: Option<f64>,
965 pub network_recv_cpu_usage: Option<f64>,
966 pub id: Option<usize>,
967 pub tag: Option<String>,
968}
969
970impl Hash for HydroIrMetadata {
972 fn hash<H: Hasher>(&self, _: &mut H) {}
973}
974
975impl PartialEq for HydroIrMetadata {
976 fn eq(&self, _: &Self) -> bool {
977 true
978 }
979}
980
981impl Eq for HydroIrMetadata {}
982
983impl Debug for HydroIrMetadata {
984 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985 f.debug_struct("HydroIrMetadata")
986 .field("location_kind", &self.location_kind)
987 .field("output_type", &self.output_type)
988 .finish()
989 }
990}
991
992#[derive(Debug, Hash)]
995pub enum HydroNode {
996 Placeholder,
997
998 Source {
999 source: HydroSource,
1000 metadata: HydroIrMetadata,
1001 },
1002
1003 CycleSource {
1004 ident: syn::Ident,
1005 metadata: HydroIrMetadata,
1006 },
1007
1008 Tee {
1009 inner: TeeNode,
1010 metadata: HydroIrMetadata,
1011 },
1012
1013 Persist {
1014 inner: Box<HydroNode>,
1015 metadata: HydroIrMetadata,
1016 },
1017
1018 Unpersist {
1019 inner: Box<HydroNode>,
1020 metadata: HydroIrMetadata,
1021 },
1022
1023 Delta {
1024 inner: Box<HydroNode>,
1025 metadata: HydroIrMetadata,
1026 },
1027
1028 Chain {
1029 first: Box<HydroNode>,
1030 second: Box<HydroNode>,
1031 metadata: HydroIrMetadata,
1032 },
1033
1034 CrossProduct {
1035 left: Box<HydroNode>,
1036 right: Box<HydroNode>,
1037 metadata: HydroIrMetadata,
1038 },
1039
1040 CrossSingleton {
1041 left: Box<HydroNode>,
1042 right: Box<HydroNode>,
1043 metadata: HydroIrMetadata,
1044 },
1045
1046 Join {
1047 left: Box<HydroNode>,
1048 right: Box<HydroNode>,
1049 metadata: HydroIrMetadata,
1050 },
1051
1052 Difference {
1053 pos: Box<HydroNode>,
1054 neg: Box<HydroNode>,
1055 metadata: HydroIrMetadata,
1056 },
1057
1058 AntiJoin {
1059 pos: Box<HydroNode>,
1060 neg: Box<HydroNode>,
1061 metadata: HydroIrMetadata,
1062 },
1063
1064 ResolveFutures {
1065 input: Box<HydroNode>,
1066 metadata: HydroIrMetadata,
1067 },
1068 ResolveFuturesOrdered {
1069 input: Box<HydroNode>,
1070 metadata: HydroIrMetadata,
1071 },
1072
1073 Map {
1074 f: DebugExpr,
1075 input: Box<HydroNode>,
1076 metadata: HydroIrMetadata,
1077 },
1078 FlatMap {
1079 f: DebugExpr,
1080 input: Box<HydroNode>,
1081 metadata: HydroIrMetadata,
1082 },
1083 Filter {
1084 f: DebugExpr,
1085 input: Box<HydroNode>,
1086 metadata: HydroIrMetadata,
1087 },
1088 FilterMap {
1089 f: DebugExpr,
1090 input: Box<HydroNode>,
1091 metadata: HydroIrMetadata,
1092 },
1093
1094 DeferTick {
1095 input: Box<HydroNode>,
1096 metadata: HydroIrMetadata,
1097 },
1098 Enumerate {
1099 is_static: bool,
1100 input: Box<HydroNode>,
1101 metadata: HydroIrMetadata,
1102 },
1103 Inspect {
1104 f: DebugExpr,
1105 input: Box<HydroNode>,
1106 metadata: HydroIrMetadata,
1107 },
1108
1109 Unique {
1110 input: Box<HydroNode>,
1111 metadata: HydroIrMetadata,
1112 },
1113
1114 Sort {
1115 input: Box<HydroNode>,
1116 metadata: HydroIrMetadata,
1117 },
1118 Fold {
1119 init: DebugExpr,
1120 acc: DebugExpr,
1121 input: Box<HydroNode>,
1122 metadata: HydroIrMetadata,
1123 },
1124
1125 Scan {
1126 init: DebugExpr,
1127 acc: DebugExpr,
1128 input: Box<HydroNode>,
1129 metadata: HydroIrMetadata,
1130 },
1131 FoldKeyed {
1132 init: DebugExpr,
1133 acc: DebugExpr,
1134 input: Box<HydroNode>,
1135 metadata: HydroIrMetadata,
1136 },
1137
1138 Reduce {
1139 f: DebugExpr,
1140 input: Box<HydroNode>,
1141 metadata: HydroIrMetadata,
1142 },
1143 ReduceKeyed {
1144 f: DebugExpr,
1145 input: Box<HydroNode>,
1146 metadata: HydroIrMetadata,
1147 },
1148 ReduceKeyedWatermark {
1149 f: DebugExpr,
1150 input: Box<HydroNode>,
1151 watermark: Box<HydroNode>,
1152 metadata: HydroIrMetadata,
1153 },
1154
1155 Network {
1156 serialize_fn: Option<DebugExpr>,
1157 instantiate_fn: DebugInstantiate,
1158 deserialize_fn: Option<DebugExpr>,
1159 input: Box<HydroNode>,
1160 metadata: HydroIrMetadata,
1161 },
1162
1163 ExternalInput {
1164 from_external_id: usize,
1165 from_key: usize,
1166 from_many: bool,
1167 codec_type: DebugType,
1168 port_hint: NetworkHint,
1169 instantiate_fn: DebugInstantiate,
1170 deserialize_fn: Option<DebugExpr>,
1171 metadata: HydroIrMetadata,
1172 },
1173
1174 Counter {
1175 tag: String,
1176 duration: DebugExpr,
1177 input: Box<HydroNode>,
1178 metadata: HydroIrMetadata,
1179 },
1180}
1181
1182pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1183pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1184
1185impl HydroNode {
1186 pub fn transform_bottom_up(
1187 &mut self,
1188 transform: &mut impl FnMut(&mut HydroNode),
1189 seen_tees: &mut SeenTees,
1190 check_well_formed: bool,
1191 ) {
1192 self.transform_children(
1193 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1194 seen_tees,
1195 );
1196
1197 transform(self);
1198
1199 let self_location = self.metadata().location_kind.root();
1200
1201 if check_well_formed {
1202 match &*self {
1203 HydroNode::Network { .. } => {}
1204 _ => {
1205 self.input_metadata().iter().for_each(|i| {
1206 if i.location_kind.root() != self_location {
1207 panic!(
1208 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1209 i,
1210 i.location_kind.root(),
1211 self,
1212 self_location
1213 )
1214 }
1215 });
1216 }
1217 }
1218 }
1219 }
1220
1221 #[inline(always)]
1222 pub fn transform_children(
1223 &mut self,
1224 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1225 seen_tees: &mut SeenTees,
1226 ) {
1227 match self {
1228 HydroNode::Placeholder => {
1229 panic!();
1230 }
1231
1232 HydroNode::Source { .. }
1233 | HydroNode::CycleSource { .. }
1234 | HydroNode::ExternalInput { .. } => {}
1235
1236 HydroNode::Tee { inner, .. } => {
1237 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1238 *inner = TeeNode(transformed.clone());
1239 } else {
1240 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1241 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1242 let mut orig = inner.0.replace(HydroNode::Placeholder);
1243 transform(&mut orig, seen_tees);
1244 *transformed_cell.borrow_mut() = orig;
1245 *inner = TeeNode(transformed_cell);
1246 }
1247 }
1248
1249 HydroNode::Persist { inner, .. }
1250 | HydroNode::Unpersist { inner, .. }
1251 | HydroNode::Delta { inner, .. } => {
1252 transform(inner.as_mut(), seen_tees);
1253 }
1254
1255 HydroNode::Chain { first, second, .. } => {
1256 transform(first.as_mut(), seen_tees);
1257 transform(second.as_mut(), seen_tees);
1258 }
1259
1260 HydroNode::CrossSingleton { left, right, .. }
1261 | HydroNode::CrossProduct { left, right, .. }
1262 | HydroNode::Join { left, right, .. } => {
1263 transform(left.as_mut(), seen_tees);
1264 transform(right.as_mut(), seen_tees);
1265 }
1266
1267 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1268 transform(pos.as_mut(), seen_tees);
1269 transform(neg.as_mut(), seen_tees);
1270 }
1271
1272 HydroNode::ReduceKeyedWatermark {
1273 input, watermark, ..
1274 } => {
1275 transform(input.as_mut(), seen_tees);
1276 transform(watermark.as_mut(), seen_tees);
1277 }
1278
1279 HydroNode::Map { input, .. }
1280 | HydroNode::ResolveFutures { input, .. }
1281 | HydroNode::ResolveFuturesOrdered { input, .. }
1282 | HydroNode::FlatMap { input, .. }
1283 | HydroNode::Filter { input, .. }
1284 | HydroNode::FilterMap { input, .. }
1285 | HydroNode::Sort { input, .. }
1286 | HydroNode::DeferTick { input, .. }
1287 | HydroNode::Enumerate { input, .. }
1288 | HydroNode::Inspect { input, .. }
1289 | HydroNode::Unique { input, .. }
1290 | HydroNode::Network { input, .. }
1291 | HydroNode::Fold { input, .. }
1292 | HydroNode::Scan { input, .. }
1293 | HydroNode::FoldKeyed { input, .. }
1294 | HydroNode::Reduce { input, .. }
1295 | HydroNode::ReduceKeyed { input, .. }
1296 | HydroNode::Counter { input, .. } => {
1297 transform(input.as_mut(), seen_tees);
1298 }
1299 }
1300 }
1301
1302 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1303 match self {
1304 HydroNode::Placeholder => HydroNode::Placeholder,
1305 HydroNode::Source { source, metadata } => HydroNode::Source {
1306 source: source.clone(),
1307 metadata: metadata.clone(),
1308 },
1309 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1310 ident: ident.clone(),
1311 metadata: metadata.clone(),
1312 },
1313 HydroNode::Tee { inner, metadata } => {
1314 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1315 HydroNode::Tee {
1316 inner: TeeNode(transformed.clone()),
1317 metadata: metadata.clone(),
1318 }
1319 } else {
1320 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1321 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1322 let cloned = inner.0.borrow().deep_clone(seen_tees);
1323 *new_rc.borrow_mut() = cloned;
1324 HydroNode::Tee {
1325 inner: TeeNode(new_rc),
1326 metadata: metadata.clone(),
1327 }
1328 }
1329 }
1330 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1331 inner: Box::new(inner.deep_clone(seen_tees)),
1332 metadata: metadata.clone(),
1333 },
1334 HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
1335 inner: Box::new(inner.deep_clone(seen_tees)),
1336 metadata: metadata.clone(),
1337 },
1338 HydroNode::Delta { inner, metadata } => HydroNode::Delta {
1339 inner: Box::new(inner.deep_clone(seen_tees)),
1340 metadata: metadata.clone(),
1341 },
1342 HydroNode::Chain {
1343 first,
1344 second,
1345 metadata,
1346 } => HydroNode::Chain {
1347 first: Box::new(first.deep_clone(seen_tees)),
1348 second: Box::new(second.deep_clone(seen_tees)),
1349 metadata: metadata.clone(),
1350 },
1351 HydroNode::CrossProduct {
1352 left,
1353 right,
1354 metadata,
1355 } => HydroNode::CrossProduct {
1356 left: Box::new(left.deep_clone(seen_tees)),
1357 right: Box::new(right.deep_clone(seen_tees)),
1358 metadata: metadata.clone(),
1359 },
1360 HydroNode::CrossSingleton {
1361 left,
1362 right,
1363 metadata,
1364 } => HydroNode::CrossSingleton {
1365 left: Box::new(left.deep_clone(seen_tees)),
1366 right: Box::new(right.deep_clone(seen_tees)),
1367 metadata: metadata.clone(),
1368 },
1369 HydroNode::Join {
1370 left,
1371 right,
1372 metadata,
1373 } => HydroNode::Join {
1374 left: Box::new(left.deep_clone(seen_tees)),
1375 right: Box::new(right.deep_clone(seen_tees)),
1376 metadata: metadata.clone(),
1377 },
1378 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1379 pos: Box::new(pos.deep_clone(seen_tees)),
1380 neg: Box::new(neg.deep_clone(seen_tees)),
1381 metadata: metadata.clone(),
1382 },
1383 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1384 pos: Box::new(pos.deep_clone(seen_tees)),
1385 neg: Box::new(neg.deep_clone(seen_tees)),
1386 metadata: metadata.clone(),
1387 },
1388 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1389 input: Box::new(input.deep_clone(seen_tees)),
1390 metadata: metadata.clone(),
1391 },
1392 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1393 HydroNode::ResolveFuturesOrdered {
1394 input: Box::new(input.deep_clone(seen_tees)),
1395 metadata: metadata.clone(),
1396 }
1397 }
1398 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1399 f: f.clone(),
1400 input: Box::new(input.deep_clone(seen_tees)),
1401 metadata: metadata.clone(),
1402 },
1403 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1404 f: f.clone(),
1405 input: Box::new(input.deep_clone(seen_tees)),
1406 metadata: metadata.clone(),
1407 },
1408 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1409 f: f.clone(),
1410 input: Box::new(input.deep_clone(seen_tees)),
1411 metadata: metadata.clone(),
1412 },
1413 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1414 f: f.clone(),
1415 input: Box::new(input.deep_clone(seen_tees)),
1416 metadata: metadata.clone(),
1417 },
1418 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1419 input: Box::new(input.deep_clone(seen_tees)),
1420 metadata: metadata.clone(),
1421 },
1422 HydroNode::Enumerate {
1423 is_static,
1424 input,
1425 metadata,
1426 } => HydroNode::Enumerate {
1427 is_static: *is_static,
1428 input: Box::new(input.deep_clone(seen_tees)),
1429 metadata: metadata.clone(),
1430 },
1431 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1432 f: f.clone(),
1433 input: Box::new(input.deep_clone(seen_tees)),
1434 metadata: metadata.clone(),
1435 },
1436 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1437 input: Box::new(input.deep_clone(seen_tees)),
1438 metadata: metadata.clone(),
1439 },
1440 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1441 input: Box::new(input.deep_clone(seen_tees)),
1442 metadata: metadata.clone(),
1443 },
1444 HydroNode::Fold {
1445 init,
1446 acc,
1447 input,
1448 metadata,
1449 } => HydroNode::Fold {
1450 init: init.clone(),
1451 acc: acc.clone(),
1452 input: Box::new(input.deep_clone(seen_tees)),
1453 metadata: metadata.clone(),
1454 },
1455 HydroNode::Scan {
1456 init,
1457 acc,
1458 input,
1459 metadata,
1460 } => HydroNode::Scan {
1461 init: init.clone(),
1462 acc: acc.clone(),
1463 input: Box::new(input.deep_clone(seen_tees)),
1464 metadata: metadata.clone(),
1465 },
1466 HydroNode::FoldKeyed {
1467 init,
1468 acc,
1469 input,
1470 metadata,
1471 } => HydroNode::FoldKeyed {
1472 init: init.clone(),
1473 acc: acc.clone(),
1474 input: Box::new(input.deep_clone(seen_tees)),
1475 metadata: metadata.clone(),
1476 },
1477 HydroNode::ReduceKeyedWatermark {
1478 f,
1479 input,
1480 watermark,
1481 metadata,
1482 } => HydroNode::ReduceKeyedWatermark {
1483 f: f.clone(),
1484 input: Box::new(input.deep_clone(seen_tees)),
1485 watermark: Box::new(watermark.deep_clone(seen_tees)),
1486 metadata: metadata.clone(),
1487 },
1488 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1489 f: f.clone(),
1490 input: Box::new(input.deep_clone(seen_tees)),
1491 metadata: metadata.clone(),
1492 },
1493 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1494 f: f.clone(),
1495 input: Box::new(input.deep_clone(seen_tees)),
1496 metadata: metadata.clone(),
1497 },
1498 HydroNode::Network {
1499 serialize_fn,
1500 instantiate_fn,
1501 deserialize_fn,
1502 input,
1503 metadata,
1504 } => HydroNode::Network {
1505 serialize_fn: serialize_fn.clone(),
1506 instantiate_fn: instantiate_fn.clone(),
1507 deserialize_fn: deserialize_fn.clone(),
1508 input: Box::new(input.deep_clone(seen_tees)),
1509 metadata: metadata.clone(),
1510 },
1511 HydroNode::ExternalInput {
1512 from_external_id,
1513 from_key,
1514 from_many,
1515 codec_type,
1516 port_hint,
1517 instantiate_fn,
1518 deserialize_fn,
1519 metadata,
1520 } => HydroNode::ExternalInput {
1521 from_external_id: *from_external_id,
1522 from_key: *from_key,
1523 from_many: *from_many,
1524 codec_type: codec_type.clone(),
1525 port_hint: *port_hint,
1526 instantiate_fn: instantiate_fn.clone(),
1527 deserialize_fn: deserialize_fn.clone(),
1528 metadata: metadata.clone(),
1529 },
1530 HydroNode::Counter {
1531 tag,
1532 duration,
1533 input,
1534 metadata,
1535 } => HydroNode::Counter {
1536 tag: tag.clone(),
1537 duration: duration.clone(),
1538 input: Box::new(input.deep_clone(seen_tees)),
1539 metadata: metadata.clone(),
1540 },
1541 }
1542 }
1543
1544 #[cfg(feature = "build")]
1545 pub fn emit_core(
1546 &mut self,
1547 builders_or_callback: &mut BuildersOrCallback<
1548 impl FnMut(&mut HydroLeaf, &mut usize),
1549 impl FnMut(&mut HydroNode, &mut usize),
1550 >,
1551 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1552 next_stmt_id: &mut usize,
1553 ) -> (syn::Ident, usize) {
1554 match self {
1555 HydroNode::Placeholder => {
1556 panic!()
1557 }
1558
1559 HydroNode::Persist { inner, .. } => {
1560 let (inner_ident, location) =
1561 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1562
1563 let persist_ident =
1564 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1565
1566 match builders_or_callback {
1567 BuildersOrCallback::Builders(graph_builders) => {
1568 let builder = graph_builders.entry(location).or_default();
1569 builder.add_dfir(
1570 parse_quote! {
1571 #persist_ident = #inner_ident -> persist::<'static>();
1572 },
1573 None,
1574 Some(&next_stmt_id.to_string()),
1575 );
1576 }
1577 BuildersOrCallback::Callback(_, node_callback) => {
1578 node_callback(self, next_stmt_id);
1579 }
1580 }
1581
1582 *next_stmt_id += 1;
1583
1584 (persist_ident, location)
1585 }
1586
1587 HydroNode::Unpersist { .. } => {
1588 panic!(
1589 "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1590 )
1591 }
1592
1593 HydroNode::Delta { inner, .. } => {
1594 let (inner_ident, location) =
1595 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1596
1597 let delta_ident =
1598 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1599
1600 match builders_or_callback {
1601 BuildersOrCallback::Builders(graph_builders) => {
1602 let builder = graph_builders.entry(location).or_default();
1603 builder.add_dfir(
1604 parse_quote! {
1605 #delta_ident = #inner_ident -> multiset_delta();
1606 },
1607 None,
1608 Some(&next_stmt_id.to_string()),
1609 );
1610 }
1611 BuildersOrCallback::Callback(_, node_callback) => {
1612 node_callback(self, next_stmt_id);
1613 }
1614 }
1615
1616 *next_stmt_id += 1;
1617
1618 (delta_ident, location)
1619 }
1620
1621 HydroNode::Source {
1622 source, metadata, ..
1623 } => {
1624 let location_id = metadata.location_kind.root().raw_id();
1625
1626 if let HydroSource::ExternalNetwork() = source {
1627 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1628 } else {
1629 let source_ident =
1630 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1631
1632 let source_stmt = match source {
1633 HydroSource::Stream(expr) => {
1634 parse_quote! {
1635 #source_ident = source_stream(#expr);
1636 }
1637 }
1638
1639 HydroSource::ExternalNetwork() => {
1640 unreachable!()
1641 }
1642
1643 HydroSource::Iter(expr) => {
1644 parse_quote! {
1645 #source_ident = source_iter(#expr);
1646 }
1647 }
1648
1649 HydroSource::Spin() => {
1650 parse_quote! {
1651 #source_ident = spin();
1652 }
1653 }
1654 };
1655
1656 match builders_or_callback {
1657 BuildersOrCallback::Builders(graph_builders) => {
1658 let builder = graph_builders.entry(location_id).or_default();
1659 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1660 }
1661 BuildersOrCallback::Callback(_, node_callback) => {
1662 node_callback(self, next_stmt_id);
1663 }
1664 }
1665
1666 *next_stmt_id += 1;
1667
1668 (source_ident, location_id)
1669 }
1670 }
1671
1672 HydroNode::CycleSource {
1673 ident, metadata, ..
1674 } => {
1675 let location_id = metadata.location_kind.root().raw_id();
1676
1677 let ident = ident.clone();
1678
1679 match builders_or_callback {
1680 BuildersOrCallback::Builders(_) => {}
1681 BuildersOrCallback::Callback(_, node_callback) => {
1682 node_callback(self, next_stmt_id);
1683 }
1684 }
1685
1686 *next_stmt_id += 1;
1688
1689 (ident, location_id)
1690 }
1691
1692 HydroNode::Tee { inner, .. } => {
1693 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1694 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1695 {
1696 match builders_or_callback {
1697 BuildersOrCallback::Builders(_) => {}
1698 BuildersOrCallback::Callback(_, node_callback) => {
1699 node_callback(self, next_stmt_id);
1700 }
1701 }
1702
1703 (teed_from.clone(), *inner_location_id)
1704 } else {
1705 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1706 builders_or_callback,
1707 built_tees,
1708 next_stmt_id,
1709 );
1710
1711 let tee_ident =
1712 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1713
1714 built_tees.insert(
1715 inner.0.as_ref() as *const RefCell<HydroNode>,
1716 (tee_ident.clone(), inner_location_id),
1717 );
1718
1719 match builders_or_callback {
1720 BuildersOrCallback::Builders(graph_builders) => {
1721 let builder = graph_builders.entry(inner_location_id).or_default();
1722 builder.add_dfir(
1723 parse_quote! {
1724 #tee_ident = #inner_ident -> tee();
1725 },
1726 None,
1727 Some(&next_stmt_id.to_string()),
1728 );
1729 }
1730 BuildersOrCallback::Callback(_, node_callback) => {
1731 node_callback(self, next_stmt_id);
1732 }
1733 }
1734
1735 (tee_ident, inner_location_id)
1736 };
1737
1738 *next_stmt_id += 1;
1742 (ret_ident, inner_location_id)
1743 }
1744
1745 HydroNode::Chain { first, second, .. } => {
1746 let (first_ident, first_location_id) =
1747 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1748 let (second_ident, second_location_id) =
1749 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1750
1751 let chain_ident =
1752 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1753
1754 match builders_or_callback {
1755 BuildersOrCallback::Builders(graph_builders) => {
1756 assert_eq!(
1757 first_location_id, second_location_id,
1758 "chain inputs must be in the same location"
1759 );
1760 let builder = graph_builders.entry(first_location_id).or_default();
1761 builder.add_dfir(
1762 parse_quote! {
1763 #chain_ident = chain();
1764 #first_ident -> [0]#chain_ident;
1765 #second_ident -> [1]#chain_ident;
1766 },
1767 None,
1768 Some(&next_stmt_id.to_string()),
1769 );
1770 }
1771 BuildersOrCallback::Callback(_, node_callback) => {
1772 node_callback(self, next_stmt_id);
1773 }
1774 }
1775
1776 *next_stmt_id += 1;
1777
1778 (chain_ident, first_location_id)
1779 }
1780
1781 HydroNode::CrossSingleton { left, right, .. } => {
1782 let (left_ident, left_location_id) =
1783 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1784 let (right_ident, right_location_id) =
1785 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1786
1787 let cross_ident =
1788 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1789
1790 match builders_or_callback {
1791 BuildersOrCallback::Builders(graph_builders) => {
1792 assert_eq!(
1793 left_location_id, right_location_id,
1794 "cross_singleton inputs must be in the same location"
1795 );
1796
1797 let builder = graph_builders.entry(left_location_id).or_default();
1798 builder.add_dfir(
1799 parse_quote! {
1800 #cross_ident = cross_singleton();
1801 #left_ident -> [input]#cross_ident;
1802 #right_ident -> [single]#cross_ident;
1803 },
1804 None,
1805 Some(&next_stmt_id.to_string()),
1806 );
1807 }
1808 BuildersOrCallback::Callback(_, node_callback) => {
1809 node_callback(self, next_stmt_id);
1810 }
1811 }
1812
1813 *next_stmt_id += 1;
1814
1815 (cross_ident, left_location_id)
1816 }
1817
1818 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1819 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1820 parse_quote!(cross_join_multiset)
1821 } else {
1822 parse_quote!(join_multiset)
1823 };
1824
1825 let (HydroNode::CrossProduct { left, right, .. }
1826 | HydroNode::Join { left, right, .. }) = self
1827 else {
1828 unreachable!()
1829 };
1830
1831 let (left_inner, left_lifetime) =
1832 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1833 (left, quote!('static))
1834 } else {
1835 (left, quote!('tick))
1836 };
1837
1838 let (right_inner, right_lifetime) =
1839 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1840 (right, quote!('static))
1841 } else {
1842 (right, quote!('tick))
1843 };
1844
1845 let (left_ident, left_location_id) =
1846 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1847 let (right_ident, right_location_id) =
1848 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1849
1850 let stream_ident =
1851 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1852
1853 match builders_or_callback {
1854 BuildersOrCallback::Builders(graph_builders) => {
1855 assert_eq!(
1856 left_location_id, right_location_id,
1857 "join / cross product inputs must be in the same location"
1858 );
1859
1860 let builder = graph_builders.entry(left_location_id).or_default();
1861 builder.add_dfir(
1862 parse_quote! {
1863 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1864 #left_ident -> [0]#stream_ident;
1865 #right_ident -> [1]#stream_ident;
1866 },
1867 None,
1868 Some(&next_stmt_id.to_string()),
1869 );
1870 }
1871 BuildersOrCallback::Callback(_, node_callback) => {
1872 node_callback(self, next_stmt_id);
1873 }
1874 }
1875
1876 *next_stmt_id += 1;
1877
1878 (stream_ident, left_location_id)
1879 }
1880
1881 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1882 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1883 parse_quote!(difference_multiset)
1884 } else {
1885 parse_quote!(anti_join_multiset)
1886 };
1887
1888 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1889 self
1890 else {
1891 unreachable!()
1892 };
1893
1894 let (neg, neg_lifetime) =
1895 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1896 (neg, quote!('static))
1897 } else {
1898 (neg, quote!('tick))
1899 };
1900
1901 let (pos_ident, pos_location_id) =
1902 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1903 let (neg_ident, neg_location_id) =
1904 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1905
1906 let stream_ident =
1907 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1908
1909 match builders_or_callback {
1910 BuildersOrCallback::Builders(graph_builders) => {
1911 assert_eq!(
1912 pos_location_id, neg_location_id,
1913 "difference / anti join inputs must be in the same location"
1914 );
1915
1916 let builder = graph_builders.entry(pos_location_id).or_default();
1917 builder.add_dfir(
1918 parse_quote! {
1919 #stream_ident = #operator::<'tick, #neg_lifetime>();
1920 #pos_ident -> [pos]#stream_ident;
1921 #neg_ident -> [neg]#stream_ident;
1922 },
1923 None,
1924 Some(&next_stmt_id.to_string()),
1925 );
1926 }
1927 BuildersOrCallback::Callback(_, node_callback) => {
1928 node_callback(self, next_stmt_id);
1929 }
1930 }
1931
1932 *next_stmt_id += 1;
1933
1934 (stream_ident, pos_location_id)
1935 }
1936
1937 HydroNode::ResolveFutures { input, .. } => {
1938 let (input_ident, input_location_id) =
1939 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1940
1941 let futures_ident =
1942 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1943
1944 match builders_or_callback {
1945 BuildersOrCallback::Builders(graph_builders) => {
1946 let builder = graph_builders.entry(input_location_id).or_default();
1947 builder.add_dfir(
1948 parse_quote! {
1949 #futures_ident = #input_ident -> resolve_futures();
1950 },
1951 None,
1952 Some(&next_stmt_id.to_string()),
1953 );
1954 }
1955 BuildersOrCallback::Callback(_, node_callback) => {
1956 node_callback(self, next_stmt_id);
1957 }
1958 }
1959
1960 *next_stmt_id += 1;
1961
1962 (futures_ident, input_location_id)
1963 }
1964
1965 HydroNode::ResolveFuturesOrdered { input, .. } => {
1966 let (input_ident, input_location_id) =
1967 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1968
1969 let futures_ident =
1970 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1971
1972 match builders_or_callback {
1973 BuildersOrCallback::Builders(graph_builders) => {
1974 let builder = graph_builders.entry(input_location_id).or_default();
1975 builder.add_dfir(
1976 parse_quote! {
1977 #futures_ident = #input_ident -> resolve_futures_ordered();
1978 },
1979 None,
1980 Some(&next_stmt_id.to_string()),
1981 );
1982 }
1983 BuildersOrCallback::Callback(_, node_callback) => {
1984 node_callback(self, next_stmt_id);
1985 }
1986 }
1987
1988 *next_stmt_id += 1;
1989
1990 (futures_ident, input_location_id)
1991 }
1992
1993 HydroNode::Map { f, input, .. } => {
1994 let (input_ident, input_location_id) =
1995 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1996
1997 let map_ident =
1998 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1999
2000 match builders_or_callback {
2001 BuildersOrCallback::Builders(graph_builders) => {
2002 let builder = graph_builders.entry(input_location_id).or_default();
2003 builder.add_dfir(
2004 parse_quote! {
2005 #map_ident = #input_ident -> map(#f);
2006 },
2007 None,
2008 Some(&next_stmt_id.to_string()),
2009 );
2010 }
2011 BuildersOrCallback::Callback(_, node_callback) => {
2012 node_callback(self, next_stmt_id);
2013 }
2014 }
2015
2016 *next_stmt_id += 1;
2017
2018 (map_ident, input_location_id)
2019 }
2020
2021 HydroNode::FlatMap { f, input, .. } => {
2022 let (input_ident, input_location_id) =
2023 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2024
2025 let flat_map_ident =
2026 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2027
2028 match builders_or_callback {
2029 BuildersOrCallback::Builders(graph_builders) => {
2030 let builder = graph_builders.entry(input_location_id).or_default();
2031 builder.add_dfir(
2032 parse_quote! {
2033 #flat_map_ident = #input_ident -> flat_map(#f);
2034 },
2035 None,
2036 Some(&next_stmt_id.to_string()),
2037 );
2038 }
2039 BuildersOrCallback::Callback(_, node_callback) => {
2040 node_callback(self, next_stmt_id);
2041 }
2042 }
2043
2044 *next_stmt_id += 1;
2045
2046 (flat_map_ident, input_location_id)
2047 }
2048
2049 HydroNode::Filter { f, input, .. } => {
2050 let (input_ident, input_location_id) =
2051 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2052
2053 let filter_ident =
2054 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2055
2056 match builders_or_callback {
2057 BuildersOrCallback::Builders(graph_builders) => {
2058 let builder = graph_builders.entry(input_location_id).or_default();
2059 builder.add_dfir(
2060 parse_quote! {
2061 #filter_ident = #input_ident -> filter(#f);
2062 },
2063 None,
2064 Some(&next_stmt_id.to_string()),
2065 );
2066 }
2067 BuildersOrCallback::Callback(_, node_callback) => {
2068 node_callback(self, next_stmt_id);
2069 }
2070 }
2071
2072 *next_stmt_id += 1;
2073
2074 (filter_ident, input_location_id)
2075 }
2076
2077 HydroNode::FilterMap { f, input, .. } => {
2078 let (input_ident, input_location_id) =
2079 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2080
2081 let filter_map_ident =
2082 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2083
2084 match builders_or_callback {
2085 BuildersOrCallback::Builders(graph_builders) => {
2086 let builder = graph_builders.entry(input_location_id).or_default();
2087 builder.add_dfir(
2088 parse_quote! {
2089 #filter_map_ident = #input_ident -> filter_map(#f);
2090 },
2091 None,
2092 Some(&next_stmt_id.to_string()),
2093 );
2094 }
2095 BuildersOrCallback::Callback(_, node_callback) => {
2096 node_callback(self, next_stmt_id);
2097 }
2098 }
2099
2100 *next_stmt_id += 1;
2101
2102 (filter_map_ident, input_location_id)
2103 }
2104
2105 HydroNode::Sort { input, .. } => {
2106 let (input_ident, input_location_id) =
2107 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2108
2109 let sort_ident =
2110 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2111
2112 match builders_or_callback {
2113 BuildersOrCallback::Builders(graph_builders) => {
2114 let builder = graph_builders.entry(input_location_id).or_default();
2115 builder.add_dfir(
2116 parse_quote! {
2117 #sort_ident = #input_ident -> sort();
2118 },
2119 None,
2120 Some(&next_stmt_id.to_string()),
2121 );
2122 }
2123 BuildersOrCallback::Callback(_, node_callback) => {
2124 node_callback(self, next_stmt_id);
2125 }
2126 }
2127
2128 *next_stmt_id += 1;
2129
2130 (sort_ident, input_location_id)
2131 }
2132
2133 HydroNode::DeferTick { input, .. } => {
2134 let (input_ident, input_location_id) =
2135 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2136
2137 let defer_tick_ident =
2138 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2139
2140 match builders_or_callback {
2141 BuildersOrCallback::Builders(graph_builders) => {
2142 let builder = graph_builders.entry(input_location_id).or_default();
2143 builder.add_dfir(
2144 parse_quote! {
2145 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2146 },
2147 None,
2148 Some(&next_stmt_id.to_string()),
2149 );
2150 }
2151 BuildersOrCallback::Callback(_, node_callback) => {
2152 node_callback(self, next_stmt_id);
2153 }
2154 }
2155
2156 *next_stmt_id += 1;
2157
2158 (defer_tick_ident, input_location_id)
2159 }
2160
2161 HydroNode::Enumerate {
2162 is_static, input, ..
2163 } => {
2164 let (input_ident, input_location_id) =
2165 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2166
2167 let enumerate_ident =
2168 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2169
2170 match builders_or_callback {
2171 BuildersOrCallback::Builders(graph_builders) => {
2172 let builder = graph_builders.entry(input_location_id).or_default();
2173 let lifetime = if *is_static {
2174 quote!('static)
2175 } else {
2176 quote!('tick)
2177 };
2178 builder.add_dfir(
2179 parse_quote! {
2180 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2181 },
2182 None,
2183 Some(&next_stmt_id.to_string()),
2184 );
2185 }
2186 BuildersOrCallback::Callback(_, node_callback) => {
2187 node_callback(self, next_stmt_id);
2188 }
2189 }
2190
2191 *next_stmt_id += 1;
2192
2193 (enumerate_ident, input_location_id)
2194 }
2195
2196 HydroNode::Inspect { f, input, .. } => {
2197 let (input_ident, input_location_id) =
2198 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2199
2200 let inspect_ident =
2201 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2202
2203 match builders_or_callback {
2204 BuildersOrCallback::Builders(graph_builders) => {
2205 let builder = graph_builders.entry(input_location_id).or_default();
2206 builder.add_dfir(
2207 parse_quote! {
2208 #inspect_ident = #input_ident -> inspect(#f);
2209 },
2210 None,
2211 Some(&next_stmt_id.to_string()),
2212 );
2213 }
2214 BuildersOrCallback::Callback(_, node_callback) => {
2215 node_callback(self, next_stmt_id);
2216 }
2217 }
2218
2219 *next_stmt_id += 1;
2220
2221 (inspect_ident, input_location_id)
2222 }
2223
2224 HydroNode::Unique { input, .. } => {
2225 let (input_ident, input_location_id) =
2226 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2227
2228 let unique_ident =
2229 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2230
2231 match builders_or_callback {
2232 BuildersOrCallback::Builders(graph_builders) => {
2233 let builder = graph_builders.entry(input_location_id).or_default();
2234 builder.add_dfir(
2235 parse_quote! {
2236 #unique_ident = #input_ident -> unique::<'tick>();
2237 },
2238 None,
2239 Some(&next_stmt_id.to_string()),
2240 );
2241 }
2242 BuildersOrCallback::Callback(_, node_callback) => {
2243 node_callback(self, next_stmt_id);
2244 }
2245 }
2246
2247 *next_stmt_id += 1;
2248
2249 (unique_ident, input_location_id)
2250 }
2251
2252 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2253 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2254 parse_quote!(fold)
2255 } else if matches!(self, HydroNode::Scan { .. }) {
2256 parse_quote!(scan)
2257 } else {
2258 parse_quote!(fold_keyed)
2259 };
2260
2261 let (HydroNode::Fold {
2262 init, acc, input, ..
2263 }
2264 | HydroNode::FoldKeyed {
2265 init, acc, input, ..
2266 }
2267 | HydroNode::Scan {
2268 init, acc, input, ..
2269 }) = self
2270 else {
2271 unreachable!()
2272 };
2273
2274 let (input, lifetime) =
2275 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2276 (input, quote!('static))
2277 } else {
2278 (input, quote!('tick))
2279 };
2280
2281 let (input_ident, input_location_id) =
2282 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2283
2284 let fold_ident =
2285 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2286
2287 match builders_or_callback {
2288 BuildersOrCallback::Builders(graph_builders) => {
2289 let builder = graph_builders.entry(input_location_id).or_default();
2290 builder.add_dfir(
2291 parse_quote! {
2292 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2293 },
2294 None,
2295 Some(&next_stmt_id.to_string()),
2296 );
2297 }
2298 BuildersOrCallback::Callback(_, node_callback) => {
2299 node_callback(self, next_stmt_id);
2300 }
2301 }
2302
2303 *next_stmt_id += 1;
2304
2305 (fold_ident, input_location_id)
2306 }
2307
2308 HydroNode::ReduceKeyedWatermark {
2309 f,
2310 input,
2311 watermark,
2312 ..
2313 } => {
2314 let (input, lifetime) =
2315 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2316 (input, quote!('static))
2317 } else {
2318 (input, quote!('tick))
2319 };
2320
2321 let (input_ident, input_location_id) =
2322 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2323
2324 let (watermark_ident, watermark_location_id) =
2325 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2326
2327 let chain_ident = syn::Ident::new(
2328 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2329 Span::call_site(),
2330 );
2331
2332 let fold_ident =
2333 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335 match builders_or_callback {
2336 BuildersOrCallback::Builders(graph_builders) => {
2337 assert_eq!(
2338 input_location_id, watermark_location_id,
2339 "ReduceKeyedWatermark inputs must be in the same location"
2340 );
2341
2342 let builder = graph_builders.entry(input_location_id).or_default();
2343 builder.add_dfir(
2348 parse_quote! {
2349 #chain_ident = chain();
2350 #input_ident
2351 -> map(|x| (Some(x), None))
2352 -> [0]#chain_ident;
2353 #watermark_ident
2354 -> map(|watermark| (None, Some(watermark)))
2355 -> [1]#chain_ident;
2356
2357 #fold_ident = #chain_ident
2358 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2359 let __reduce_keyed_fn = #f;
2360 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2361 if let Some((k, v)) = opt_payload {
2362 if let Some(curr_watermark) = *opt_curr_watermark {
2363 if k <= curr_watermark {
2364 return;
2365 }
2366 }
2367 match map.entry(k) {
2368 ::std::collections::hash_map::Entry::Vacant(e) => {
2369 e.insert(v);
2370 }
2371 ::std::collections::hash_map::Entry::Occupied(mut e) => {
2372 __reduce_keyed_fn(e.get_mut(), v);
2373 }
2374 }
2375 } else {
2376 let watermark = opt_watermark.unwrap();
2377 if let Some(curr_watermark) = *opt_curr_watermark {
2378 if watermark <= curr_watermark {
2379 return;
2380 }
2381 }
2382 *opt_curr_watermark = opt_watermark;
2383 map.retain(|k, _| *k > watermark);
2384 }
2385 }
2386 })
2387 -> flat_map(|(map, _curr_watermark)| map);
2388 },
2389 None,
2390 Some(&next_stmt_id.to_string()),
2391 );
2392 }
2393 BuildersOrCallback::Callback(_, node_callback) => {
2394 node_callback(self, next_stmt_id);
2395 }
2396 }
2397
2398 *next_stmt_id += 1;
2399
2400 (fold_ident, input_location_id)
2401 }
2402
2403 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2404 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2405 parse_quote!(reduce)
2406 } else {
2407 parse_quote!(reduce_keyed)
2408 };
2409
2410 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2411 self
2412 else {
2413 unreachable!()
2414 };
2415
2416 let (input, lifetime) =
2417 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2418 (input, quote!('static))
2419 } else {
2420 (input, quote!('tick))
2421 };
2422
2423 let (input_ident, input_location_id) =
2424 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2425
2426 let reduce_ident =
2427 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2428
2429 match builders_or_callback {
2430 BuildersOrCallback::Builders(graph_builders) => {
2431 let builder = graph_builders.entry(input_location_id).or_default();
2432 builder.add_dfir(
2433 parse_quote! {
2434 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2435 },
2436 None,
2437 Some(&next_stmt_id.to_string()),
2438 );
2439 }
2440 BuildersOrCallback::Callback(_, node_callback) => {
2441 node_callback(self, next_stmt_id);
2442 }
2443 }
2444
2445 *next_stmt_id += 1;
2446
2447 (reduce_ident, input_location_id)
2448 }
2449
2450 HydroNode::Network {
2451 serialize_fn: serialize_pipeline,
2452 instantiate_fn,
2453 deserialize_fn: deserialize_pipeline,
2454 input,
2455 metadata,
2456 ..
2457 } => {
2458 let (input_ident, input_location_id) =
2459 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2460
2461 let to_id = metadata.location_kind.root().raw_id();
2462
2463 let receiver_stream_ident =
2464 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2465
2466 match builders_or_callback {
2467 BuildersOrCallback::Builders(graph_builders) => {
2468 let (sink_expr, source_expr) = match instantiate_fn {
2469 DebugInstantiate::Building => (
2470 syn::parse_quote!(DUMMY_SINK),
2471 syn::parse_quote!(DUMMY_SOURCE),
2472 ),
2473
2474 DebugInstantiate::Finalized(finalized) => {
2475 (finalized.sink.clone(), finalized.source.clone())
2476 }
2477 };
2478
2479 let sender_builder = graph_builders.entry(input_location_id).or_default();
2480 if let Some(serialize_pipeline) = serialize_pipeline {
2481 sender_builder.add_dfir(
2482 parse_quote! {
2483 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2484 },
2485 None,
2486 Some(&format!("send{}", next_stmt_id)),
2488 );
2489 } else {
2490 sender_builder.add_dfir(
2491 parse_quote! {
2492 #input_ident -> dest_sink(#sink_expr);
2493 },
2494 None,
2495 Some(&format!("send{}", next_stmt_id)),
2496 );
2497 }
2498
2499 let receiver_builder = graph_builders.entry(to_id).or_default();
2500 if let Some(deserialize_pipeline) = deserialize_pipeline {
2501 receiver_builder.add_dfir(parse_quote! {
2502 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2503 }, None, Some(&format!("recv{}", next_stmt_id)));
2504 } else {
2505 receiver_builder.add_dfir(
2506 parse_quote! {
2507 #receiver_stream_ident = source_stream(#source_expr);
2508 },
2509 None,
2510 Some(&format!("recv{}", next_stmt_id)),
2511 );
2512 }
2513 }
2514 BuildersOrCallback::Callback(_, node_callback) => {
2515 node_callback(self, next_stmt_id);
2516 }
2517 }
2518
2519 *next_stmt_id += 1;
2520
2521 (receiver_stream_ident, to_id)
2522 }
2523
2524 HydroNode::ExternalInput {
2525 instantiate_fn,
2526 deserialize_fn: deserialize_pipeline,
2527 metadata,
2528 ..
2529 } => {
2530 let to_id = metadata.location_kind.root().raw_id();
2531
2532 let receiver_stream_ident =
2533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535 match builders_or_callback {
2536 BuildersOrCallback::Builders(graph_builders) => {
2537 let (_, source_expr) = match instantiate_fn {
2538 DebugInstantiate::Building => (
2539 syn::parse_quote!(DUMMY_SINK),
2540 syn::parse_quote!(DUMMY_SOURCE),
2541 ),
2542
2543 DebugInstantiate::Finalized(finalized) => {
2544 (finalized.sink.clone(), finalized.source.clone())
2545 }
2546 };
2547
2548 let receiver_builder = graph_builders.entry(to_id).or_default();
2549 if let Some(deserialize_pipeline) = deserialize_pipeline {
2550 receiver_builder.add_dfir(parse_quote! {
2551 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2552 }, None, Some(&format!("recv{}", next_stmt_id)));
2553 } else {
2554 receiver_builder.add_dfir(
2555 parse_quote! {
2556 #receiver_stream_ident = source_stream(#source_expr);
2557 },
2558 None,
2559 Some(&format!("recv{}", next_stmt_id)),
2560 );
2561 }
2562 }
2563 BuildersOrCallback::Callback(_, node_callback) => {
2564 node_callback(self, next_stmt_id);
2565 }
2566 }
2567
2568 *next_stmt_id += 1;
2569
2570 (receiver_stream_ident, to_id)
2571 }
2572
2573 HydroNode::Counter {
2574 tag,
2575 duration,
2576 input,
2577 ..
2578 } => {
2579 let (input_ident, input_location_id) =
2580 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2581
2582 let counter_ident =
2583 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2584
2585 match builders_or_callback {
2586 BuildersOrCallback::Builders(graph_builders) => {
2587 let builder = graph_builders.entry(input_location_id).or_default();
2588 builder.add_dfir(
2589 parse_quote! {
2590 #counter_ident = #input_ident -> _counter(#tag, #duration);
2591 },
2592 None,
2593 Some(&next_stmt_id.to_string()),
2594 );
2595 }
2596 BuildersOrCallback::Callback(_, node_callback) => {
2597 node_callback(self, next_stmt_id);
2598 }
2599 }
2600
2601 *next_stmt_id += 1;
2602
2603 (counter_ident, input_location_id)
2604 }
2605 }
2606 }
2607
2608 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2609 match self {
2610 HydroNode::Placeholder => {
2611 panic!()
2612 }
2613 HydroNode::Source { source, .. } => match source {
2614 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2615 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2616 },
2617 HydroNode::CycleSource { .. }
2618 | HydroNode::Tee { .. }
2619 | HydroNode::Persist { .. }
2620 | HydroNode::Unpersist { .. }
2621 | HydroNode::Delta { .. }
2622 | HydroNode::Chain { .. }
2623 | HydroNode::CrossProduct { .. }
2624 | HydroNode::CrossSingleton { .. }
2625 | HydroNode::ResolveFutures { .. }
2626 | HydroNode::ResolveFuturesOrdered { .. }
2627 | HydroNode::Join { .. }
2628 | HydroNode::Difference { .. }
2629 | HydroNode::AntiJoin { .. }
2630 | HydroNode::DeferTick { .. }
2631 | HydroNode::Enumerate { .. }
2632 | HydroNode::Unique { .. }
2633 | HydroNode::Sort { .. } => {}
2634 HydroNode::Map { f, .. }
2635 | HydroNode::FlatMap { f, .. }
2636 | HydroNode::Filter { f, .. }
2637 | HydroNode::FilterMap { f, .. }
2638 | HydroNode::Inspect { f, .. }
2639 | HydroNode::Reduce { f, .. }
2640 | HydroNode::ReduceKeyed { f, .. }
2641 | HydroNode::ReduceKeyedWatermark { f, .. } => {
2642 transform(f);
2643 }
2644 HydroNode::Fold { init, acc, .. }
2645 | HydroNode::Scan { init, acc, .. }
2646 | HydroNode::FoldKeyed { init, acc, .. } => {
2647 transform(init);
2648 transform(acc);
2649 }
2650 HydroNode::Network {
2651 serialize_fn,
2652 deserialize_fn,
2653 ..
2654 } => {
2655 if let Some(serialize_fn) = serialize_fn {
2656 transform(serialize_fn);
2657 }
2658 if let Some(deserialize_fn) = deserialize_fn {
2659 transform(deserialize_fn);
2660 }
2661 }
2662 HydroNode::ExternalInput { deserialize_fn, .. } => {
2663 if let Some(deserialize_fn) = deserialize_fn {
2664 transform(deserialize_fn);
2665 }
2666 }
2667 HydroNode::Counter { duration, .. } => {
2668 transform(duration);
2669 }
2670 }
2671 }
2672
2673 pub fn metadata(&self) -> &HydroIrMetadata {
2674 match self {
2675 HydroNode::Placeholder => {
2676 panic!()
2677 }
2678 HydroNode::Source { metadata, .. } => metadata,
2679 HydroNode::CycleSource { metadata, .. } => metadata,
2680 HydroNode::Tee { metadata, .. } => metadata,
2681 HydroNode::Persist { metadata, .. } => metadata,
2682 HydroNode::Unpersist { metadata, .. } => metadata,
2683 HydroNode::Delta { metadata, .. } => metadata,
2684 HydroNode::Chain { metadata, .. } => metadata,
2685 HydroNode::CrossProduct { metadata, .. } => metadata,
2686 HydroNode::CrossSingleton { metadata, .. } => metadata,
2687 HydroNode::Join { metadata, .. } => metadata,
2688 HydroNode::Difference { metadata, .. } => metadata,
2689 HydroNode::AntiJoin { metadata, .. } => metadata,
2690 HydroNode::ResolveFutures { metadata, .. } => metadata,
2691 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2692 HydroNode::Map { metadata, .. } => metadata,
2693 HydroNode::FlatMap { metadata, .. } => metadata,
2694 HydroNode::Filter { metadata, .. } => metadata,
2695 HydroNode::FilterMap { metadata, .. } => metadata,
2696 HydroNode::DeferTick { metadata, .. } => metadata,
2697 HydroNode::Enumerate { metadata, .. } => metadata,
2698 HydroNode::Inspect { metadata, .. } => metadata,
2699 HydroNode::Unique { metadata, .. } => metadata,
2700 HydroNode::Sort { metadata, .. } => metadata,
2701 HydroNode::Scan { metadata, .. } => metadata,
2702 HydroNode::Fold { metadata, .. } => metadata,
2703 HydroNode::FoldKeyed { metadata, .. } => metadata,
2704 HydroNode::Reduce { metadata, .. } => metadata,
2705 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2706 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2707 HydroNode::ExternalInput { metadata, .. } => metadata,
2708 HydroNode::Network { metadata, .. } => metadata,
2709 HydroNode::Counter { metadata, .. } => metadata,
2710 }
2711 }
2712
2713 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2714 match self {
2715 HydroNode::Placeholder => {
2716 panic!()
2717 }
2718 HydroNode::Source { metadata, .. } => metadata,
2719 HydroNode::CycleSource { metadata, .. } => metadata,
2720 HydroNode::Tee { metadata, .. } => metadata,
2721 HydroNode::Persist { metadata, .. } => metadata,
2722 HydroNode::Unpersist { metadata, .. } => metadata,
2723 HydroNode::Delta { metadata, .. } => metadata,
2724 HydroNode::Chain { metadata, .. } => metadata,
2725 HydroNode::CrossProduct { metadata, .. } => metadata,
2726 HydroNode::CrossSingleton { metadata, .. } => metadata,
2727 HydroNode::Join { metadata, .. } => metadata,
2728 HydroNode::Difference { metadata, .. } => metadata,
2729 HydroNode::AntiJoin { metadata, .. } => metadata,
2730 HydroNode::ResolveFutures { metadata, .. } => metadata,
2731 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2732 HydroNode::Map { metadata, .. } => metadata,
2733 HydroNode::FlatMap { metadata, .. } => metadata,
2734 HydroNode::Filter { metadata, .. } => metadata,
2735 HydroNode::FilterMap { metadata, .. } => metadata,
2736 HydroNode::DeferTick { metadata, .. } => metadata,
2737 HydroNode::Enumerate { metadata, .. } => metadata,
2738 HydroNode::Inspect { metadata, .. } => metadata,
2739 HydroNode::Unique { metadata, .. } => metadata,
2740 HydroNode::Sort { metadata, .. } => metadata,
2741 HydroNode::Scan { metadata, .. } => metadata,
2742 HydroNode::Fold { metadata, .. } => metadata,
2743 HydroNode::FoldKeyed { metadata, .. } => metadata,
2744 HydroNode::Reduce { metadata, .. } => metadata,
2745 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2746 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2747 HydroNode::ExternalInput { metadata, .. } => metadata,
2748 HydroNode::Network { metadata, .. } => metadata,
2749 HydroNode::Counter { metadata, .. } => metadata,
2750 }
2751 }
2752
2753 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2754 match self {
2755 HydroNode::Placeholder => {
2756 panic!()
2757 }
2758 HydroNode::Source { .. }
2759 | HydroNode::ExternalInput { .. }
2760 | HydroNode::CycleSource { .. } | HydroNode::Tee { .. } => {
2762 vec![]
2763 }
2764 HydroNode::Persist { inner, .. }
2765 | HydroNode::Unpersist { inner, .. }
2766 | HydroNode::Delta { inner, .. } => {
2767 vec![inner.metadata()]
2768 }
2769 HydroNode::Chain { first, second, .. } => {
2770 vec![first.metadata(), second.metadata()]
2771 }
2772 HydroNode::CrossProduct { left, right, .. }
2773 | HydroNode::CrossSingleton { left, right, .. }
2774 | HydroNode::Join { left, right, .. } => {
2775 vec![left.metadata(), right.metadata()]
2776 }
2777 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2778 vec![pos.metadata(), neg.metadata()]
2779 }
2780 HydroNode::Map { input, .. }
2781 | HydroNode::FlatMap { input, .. }
2782 | HydroNode::Filter { input, .. }
2783 | HydroNode::FilterMap { input, .. }
2784 | HydroNode::Sort { input, .. }
2785 | HydroNode::DeferTick { input, .. }
2786 | HydroNode::Enumerate { input, .. }
2787 | HydroNode::Inspect { input, .. }
2788 | HydroNode::Unique { input, .. }
2789 | HydroNode::Network { input, .. }
2790 | HydroNode::Counter { input, .. }
2791 | HydroNode::ResolveFutures { input, .. }
2792 | HydroNode::ResolveFuturesOrdered { input, .. } => {
2793 vec![input.metadata()]
2794 }
2795 HydroNode::Fold { input, .. }
2796 | HydroNode::FoldKeyed { input, .. }
2797 | HydroNode::Reduce { input, .. }
2798 | HydroNode::ReduceKeyed { input, .. }
2799 | HydroNode::Scan { input, .. } => {
2800 if let HydroNode::Persist { inner, .. } = input.as_ref() {
2802 vec![inner.metadata()]
2803 } else {
2804 vec![input.metadata()]
2805 }
2806 }
2807 HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
2808 if let HydroNode::Persist { inner, .. } = input.as_ref() {
2810 vec![inner.metadata(), watermark.metadata()]
2811 } else {
2812 vec![input.metadata(), watermark.metadata()]
2813 }
2814 }
2815 }
2816 }
2817
2818 pub fn print_root(&self) -> String {
2819 match self {
2820 HydroNode::Placeholder => {
2821 panic!()
2822 }
2823 HydroNode::Source { source, .. } => format!("Source({:?})", source),
2824 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2825 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2826 HydroNode::Persist { .. } => "Persist()".to_string(),
2827 HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2828 HydroNode::Delta { .. } => "Delta()".to_string(),
2829 HydroNode::Chain { first, second, .. } => {
2830 format!("Chain({}, {})", first.print_root(), second.print_root())
2831 }
2832 HydroNode::CrossProduct { left, right, .. } => {
2833 format!(
2834 "CrossProduct({}, {})",
2835 left.print_root(),
2836 right.print_root()
2837 )
2838 }
2839 HydroNode::CrossSingleton { left, right, .. } => {
2840 format!(
2841 "CrossSingleton({}, {})",
2842 left.print_root(),
2843 right.print_root()
2844 )
2845 }
2846 HydroNode::Join { left, right, .. } => {
2847 format!("Join({}, {})", left.print_root(), right.print_root())
2848 }
2849 HydroNode::Difference { pos, neg, .. } => {
2850 format!("Difference({}, {})", pos.print_root(), neg.print_root())
2851 }
2852 HydroNode::AntiJoin { pos, neg, .. } => {
2853 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2854 }
2855 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2856 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2857 HydroNode::Map { f, .. } => format!("Map({:?})", f),
2858 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2859 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2860 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2861 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2862 HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2863 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2864 HydroNode::Unique { .. } => "Unique()".to_string(),
2865 HydroNode::Sort { .. } => "Sort()".to_string(),
2866 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2867 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
2868 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2869 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2870 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2871 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
2872 HydroNode::Network { .. } => "Network()".to_string(),
2873 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
2874 HydroNode::Counter { tag, duration, .. } => {
2875 format!("Counter({:?}, {:?})", tag, duration)
2876 }
2877 }
2878 }
2879}
2880
2881#[cfg(feature = "build")]
2882fn instantiate_network<'a, D>(
2883 from_location: &LocationId,
2884 to_location: &LocationId,
2885 processes: &HashMap<usize, D::Process>,
2886 clusters: &HashMap<usize, D::Cluster>,
2887 compile_env: &D::CompileEnv,
2888) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
2889where
2890 D: Deploy<'a>,
2891{
2892 let ((sink, source), connect_fn) = match (from_location, to_location) {
2893 (LocationId::Process(from), LocationId::Process(to)) => {
2894 let from_node = processes
2895 .get(from)
2896 .unwrap_or_else(|| {
2897 panic!("A process used in the graph was not instantiated: {}", from)
2898 })
2899 .clone();
2900 let to_node = processes
2901 .get(to)
2902 .unwrap_or_else(|| {
2903 panic!("A process used in the graph was not instantiated: {}", to)
2904 })
2905 .clone();
2906
2907 let sink_port = D::allocate_process_port(&from_node);
2908 let source_port = D::allocate_process_port(&to_node);
2909
2910 (
2911 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2912 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2913 )
2914 }
2915 (LocationId::Process(from), LocationId::Cluster(to)) => {
2916 let from_node = processes
2917 .get(from)
2918 .unwrap_or_else(|| {
2919 panic!("A process used in the graph was not instantiated: {}", from)
2920 })
2921 .clone();
2922 let to_node = clusters
2923 .get(to)
2924 .unwrap_or_else(|| {
2925 panic!("A cluster used in the graph was not instantiated: {}", to)
2926 })
2927 .clone();
2928
2929 let sink_port = D::allocate_process_port(&from_node);
2930 let source_port = D::allocate_cluster_port(&to_node);
2931
2932 (
2933 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2934 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2935 )
2936 }
2937 (LocationId::Cluster(from), LocationId::Process(to)) => {
2938 let from_node = clusters
2939 .get(from)
2940 .unwrap_or_else(|| {
2941 panic!("A cluster used in the graph was not instantiated: {}", from)
2942 })
2943 .clone();
2944 let to_node = processes
2945 .get(to)
2946 .unwrap_or_else(|| {
2947 panic!("A process used in the graph was not instantiated: {}", to)
2948 })
2949 .clone();
2950
2951 let sink_port = D::allocate_cluster_port(&from_node);
2952 let source_port = D::allocate_process_port(&to_node);
2953
2954 (
2955 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2956 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2957 )
2958 }
2959 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2960 let from_node = clusters
2961 .get(from)
2962 .unwrap_or_else(|| {
2963 panic!("A cluster used in the graph was not instantiated: {}", from)
2964 })
2965 .clone();
2966 let to_node = clusters
2967 .get(to)
2968 .unwrap_or_else(|| {
2969 panic!("A cluster used in the graph was not instantiated: {}", to)
2970 })
2971 .clone();
2972
2973 let sink_port = D::allocate_cluster_port(&from_node);
2974 let source_port = D::allocate_cluster_port(&to_node);
2975
2976 (
2977 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2978 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2979 )
2980 }
2981 (LocationId::Tick(_, _), _) => panic!(),
2982 (_, LocationId::Tick(_, _)) => panic!(),
2983 };
2984 (sink, source, connect_fn)
2985}
2986
2987#[cfg(test)]
2988mod test {
2989 use std::mem::size_of;
2990
2991 use stageleft::{QuotedWithContext, q};
2992
2993 use super::*;
2994
2995 #[test]
2996 fn hydro_node_size() {
2997 assert_eq!(size_of::<HydroNode>(), 232);
2998 }
2999
3000 #[test]
3001 fn hydro_leaf_size() {
3002 assert_eq!(size_of::<HydroLeaf>(), 224);
3003 }
3004
3005 #[test]
3006 fn test_simplify_q_macro_basic() {
3007 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3009 let result = simplify_q_macro(simple_expr.clone());
3010 assert_eq!(result, simple_expr);
3011 }
3012
3013 #[test]
3014 fn test_simplify_q_macro_actual_stageleft_call() {
3015 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3017 let result = simplify_q_macro(stageleft_call);
3018 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3021 }
3022
3023 #[test]
3024 fn test_closure_no_pipe_at_start() {
3025 let stageleft_call = q!({
3027 let foo = 123;
3028 move |b: usize| b + foo
3029 })
3030 .splice_fn1_ctx(&());
3031 let result = simplify_q_macro(stageleft_call);
3032 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3033 }
3034}