1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307}
308
309#[cfg(feature = "build")]
310pub trait DfirBuilder {
316 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
318
319 fn batch(
320 &mut self,
321 in_ident: syn::Ident,
322 in_location: &LocationId,
323 _in_kind: &CollectionKind,
324 out_ident: &syn::Ident,
325 out_location: &LocationId,
326 op_meta: &HydroIrOpMetadata,
327 );
328 fn yield_from_tick(
329 &mut self,
330 in_ident: syn::Ident,
331 in_location: &LocationId,
332 in_kind: &CollectionKind,
333 out_ident: &syn::Ident,
334 );
335
336 fn observe_nondet(
337 &mut self,
338 location: &LocationId,
339 in_ident: syn::Ident,
340 in_kind: &CollectionKind,
341 out_ident: &syn::Ident,
342 out_kind: &CollectionKind,
343 );
344
345 #[expect(clippy::too_many_arguments, reason = "TODO")]
346 fn create_network(
347 &mut self,
348 from: &LocationId,
349 to: &LocationId,
350 input_ident: syn::Ident,
351 out_ident: &syn::Ident,
352 serialize: &Option<DebugExpr>,
353 sink: syn::Expr,
354 source: syn::Expr,
355 deserialize: &Option<DebugExpr>,
356 tag_id: usize,
357 );
358
359 fn create_external_source(
360 &mut self,
361 on: &LocationId,
362 source_expr: syn::Expr,
363 out_ident: &syn::Ident,
364 deserialize: &Option<DebugExpr>,
365 tag_id: usize,
366 );
367
368 fn create_external_output(
369 &mut self,
370 on: &LocationId,
371 sink_expr: syn::Expr,
372 input_ident: &syn::Ident,
373 serialize: &Option<DebugExpr>,
374 tag_id: usize,
375 );
376}
377
378#[cfg(feature = "build")]
379impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
380 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
381 self.entry(location.root().raw_id()).or_default()
382 }
383
384 fn batch(
385 &mut self,
386 in_ident: syn::Ident,
387 in_location: &LocationId,
388 _in_kind: &CollectionKind,
389 out_ident: &syn::Ident,
390 _out_location: &LocationId,
391 _op_meta: &HydroIrOpMetadata,
392 ) {
393 let builder = self.get_dfir_mut(in_location.root());
394 builder.add_dfir(
395 parse_quote! {
396 #out_ident = #in_ident;
397 },
398 None,
399 None,
400 );
401 }
402
403 fn yield_from_tick(
404 &mut self,
405 in_ident: syn::Ident,
406 in_location: &LocationId,
407 _in_kind: &CollectionKind,
408 out_ident: &syn::Ident,
409 ) {
410 let builder = self.get_dfir_mut(in_location.root());
411 builder.add_dfir(
412 parse_quote! {
413 #out_ident = #in_ident;
414 },
415 None,
416 None,
417 );
418 }
419
420 fn observe_nondet(
421 &mut self,
422 location: &LocationId,
423 in_ident: syn::Ident,
424 _in_kind: &CollectionKind,
425 out_ident: &syn::Ident,
426 _out_kind: &CollectionKind,
427 ) {
428 let builder = self.get_dfir_mut(location);
429 builder.add_dfir(
430 parse_quote! {
431 #out_ident = #in_ident;
432 },
433 None,
434 None,
435 );
436 }
437
438 fn create_network(
439 &mut self,
440 from: &LocationId,
441 to: &LocationId,
442 input_ident: syn::Ident,
443 out_ident: &syn::Ident,
444 serialize: &Option<DebugExpr>,
445 sink: syn::Expr,
446 source: syn::Expr,
447 deserialize: &Option<DebugExpr>,
448 tag_id: usize,
449 ) {
450 let sender_builder = self.get_dfir_mut(from);
451 if let Some(serialize_pipeline) = serialize {
452 sender_builder.add_dfir(
453 parse_quote! {
454 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
455 },
456 None,
457 Some(&format!("send{}", tag_id)),
459 );
460 } else {
461 sender_builder.add_dfir(
462 parse_quote! {
463 #input_ident -> dest_sink(#sink);
464 },
465 None,
466 Some(&format!("send{}", tag_id)),
467 );
468 }
469
470 let receiver_builder = self.get_dfir_mut(to);
471 if let Some(deserialize_pipeline) = deserialize {
472 receiver_builder.add_dfir(
473 parse_quote! {
474 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
475 },
476 None,
477 Some(&format!("recv{}", tag_id)),
478 );
479 } else {
480 receiver_builder.add_dfir(
481 parse_quote! {
482 #out_ident = source_stream(#source);
483 },
484 None,
485 Some(&format!("recv{}", tag_id)),
486 );
487 }
488 }
489
490 fn create_external_source(
491 &mut self,
492 on: &LocationId,
493 source_expr: syn::Expr,
494 out_ident: &syn::Ident,
495 deserialize: &Option<DebugExpr>,
496 tag_id: usize,
497 ) {
498 let receiver_builder = self.get_dfir_mut(on);
499 if let Some(deserialize_pipeline) = deserialize {
500 receiver_builder.add_dfir(
501 parse_quote! {
502 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
503 },
504 None,
505 Some(&format!("recv{}", tag_id)),
506 );
507 } else {
508 receiver_builder.add_dfir(
509 parse_quote! {
510 #out_ident = source_stream(#source_expr);
511 },
512 None,
513 Some(&format!("recv{}", tag_id)),
514 );
515 }
516 }
517
518 fn create_external_output(
519 &mut self,
520 on: &LocationId,
521 sink_expr: syn::Expr,
522 input_ident: &syn::Ident,
523 serialize: &Option<DebugExpr>,
524 tag_id: usize,
525 ) {
526 let sender_builder = self.get_dfir_mut(on);
527 if let Some(serialize_fn) = serialize {
528 sender_builder.add_dfir(
529 parse_quote! {
530 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
531 },
532 None,
533 Some(&format!("send{}", tag_id)),
535 );
536 } else {
537 sender_builder.add_dfir(
538 parse_quote! {
539 #input_ident -> dest_sink(#sink_expr);
540 },
541 None,
542 Some(&format!("send{}", tag_id)),
543 );
544 }
545 }
546}
547
548#[cfg(feature = "build")]
549pub enum BuildersOrCallback<'a, L, N>
550where
551 L: FnMut(&mut HydroRoot, &mut usize),
552 N: FnMut(&mut HydroNode, &mut usize),
553{
554 Builders(&'a mut dyn DfirBuilder),
555 Callback(L, N),
556}
557
558#[derive(Debug, Hash)]
562pub enum HydroRoot {
563 ForEach {
564 f: DebugExpr,
565 input: Box<HydroNode>,
566 op_metadata: HydroIrOpMetadata,
567 },
568 SendExternal {
569 to_external_id: usize,
570 to_key: usize,
571 to_many: bool,
572 serialize_fn: Option<DebugExpr>,
573 instantiate_fn: DebugInstantiate,
574 input: Box<HydroNode>,
575 op_metadata: HydroIrOpMetadata,
576 },
577 DestSink {
578 sink: DebugExpr,
579 input: Box<HydroNode>,
580 op_metadata: HydroIrOpMetadata,
581 },
582 CycleSink {
583 ident: syn::Ident,
584 input: Box<HydroNode>,
585 op_metadata: HydroIrOpMetadata,
586 },
587}
588
589impl HydroRoot {
590 #[cfg(feature = "build")]
591 pub fn compile_network<'a, D>(
592 &mut self,
593 compile_env: &D::CompileEnv,
594 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
595 seen_tees: &mut SeenTees,
596 processes: &HashMap<usize, D::Process>,
597 clusters: &HashMap<usize, D::Cluster>,
598 externals: &HashMap<usize, D::External>,
599 ) where
600 D: Deploy<'a>,
601 {
602 self.transform_bottom_up(
603 &mut |l| {
604 if let HydroRoot::SendExternal {
605 input,
606 to_external_id,
607 to_key,
608 to_many,
609 instantiate_fn,
610 ..
611 } = l
612 {
613 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
614 DebugInstantiate::Building => {
615 let to_node = externals
616 .get(to_external_id)
617 .unwrap_or_else(|| {
618 panic!("A external used in the graph was not instantiated: {}", to_external_id)
619 })
620 .clone();
621
622 match input.metadata().location_kind.root() {
623 LocationId::Process(process_id) => {
624 if *to_many {
625 (
626 (
627 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
628 parse_quote!(DUMMY),
629 ),
630 Box::new(|| {}) as Box<dyn FnOnce()>,
631 )
632 } else {
633 let from_node = processes
634 .get(process_id)
635 .unwrap_or_else(|| {
636 panic!("A process used in the graph was not instantiated: {}", process_id)
637 })
638 .clone();
639
640 let sink_port = D::allocate_process_port(&from_node);
641 let source_port = D::allocate_external_port(&to_node);
642
643 to_node.register(*to_key, source_port.clone());
644
645 (
646 (
647 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
648 parse_quote!(DUMMY),
649 ),
650 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
651 )
652 }
653 }
654 LocationId::Cluster(_) => todo!(),
655 _ => panic!()
656 }
657 },
658
659 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
660 };
661
662 *instantiate_fn = DebugInstantiateFinalized {
663 sink: sink_expr,
664 source: source_expr,
665 connect_fn: Some(connect_fn),
666 }
667 .into();
668 }
669 },
670 &mut |n| {
671 if let HydroNode::Network {
672 input,
673 instantiate_fn,
674 metadata,
675 ..
676 } = n
677 {
678 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
679 DebugInstantiate::Building => instantiate_network::<D>(
680 input.metadata().location_kind.root(),
681 metadata.location_kind.root(),
682 processes,
683 clusters,
684 compile_env,
685 ),
686
687 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
688 };
689
690 *instantiate_fn = DebugInstantiateFinalized {
691 sink: sink_expr,
692 source: source_expr,
693 connect_fn: Some(connect_fn),
694 }
695 .into();
696 } else if let HydroNode::ExternalInput {
697 from_external_id,
698 from_key,
699 from_many,
700 codec_type,
701 port_hint,
702 instantiate_fn,
703 metadata,
704 ..
705 } = n
706 {
707 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
708 DebugInstantiate::Building => {
709 let from_node = externals
710 .get(from_external_id)
711 .unwrap_or_else(|| {
712 panic!(
713 "A external used in the graph was not instantiated: {}",
714 from_external_id
715 )
716 })
717 .clone();
718
719 match metadata.location_kind.root() {
720 LocationId::Process(process_id) => {
721 let to_node = processes
722 .get(process_id)
723 .unwrap_or_else(|| {
724 panic!("A process used in the graph was not instantiated: {}", process_id)
725 })
726 .clone();
727
728 let sink_port = D::allocate_external_port(&from_node);
729 let source_port = D::allocate_process_port(&to_node);
730
731 from_node.register(*from_key, sink_port.clone());
732
733 (
734 (
735 parse_quote!(DUMMY),
736 if *from_many {
737 D::e2o_many_source(
738 compile_env,
739 extra_stmts.entry(*process_id).or_default(),
740 &to_node, &source_port,
741 codec_type.0.as_ref(),
742 format!("{}_{}", *from_external_id, *from_key)
743 )
744 } else {
745 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
746 },
747 ),
748 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
749 )
750 }
751 LocationId::Cluster(_) => todo!(),
752 _ => panic!()
753 }
754 },
755
756 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
757 };
758
759 *instantiate_fn = DebugInstantiateFinalized {
760 sink: sink_expr,
761 source: source_expr,
762 connect_fn: Some(connect_fn),
763 }
764 .into();
765 }
766 },
767 seen_tees,
768 false,
769 );
770 }
771
772 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
773 self.transform_bottom_up(
774 &mut |l| {
775 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
776 match instantiate_fn {
777 DebugInstantiate::Building => panic!("network not built"),
778
779 DebugInstantiate::Finalized(finalized) => {
780 (finalized.connect_fn.take().unwrap())();
781 }
782 }
783 }
784 },
785 &mut |n| {
786 if let HydroNode::Network { instantiate_fn, .. }
787 | HydroNode::ExternalInput { instantiate_fn, .. } = n
788 {
789 match instantiate_fn {
790 DebugInstantiate::Building => panic!("network not built"),
791
792 DebugInstantiate::Finalized(finalized) => {
793 (finalized.connect_fn.take().unwrap())();
794 }
795 }
796 }
797 },
798 seen_tees,
799 false,
800 );
801 }
802
803 pub fn transform_bottom_up(
804 &mut self,
805 transform_root: &mut impl FnMut(&mut HydroRoot),
806 transform_node: &mut impl FnMut(&mut HydroNode),
807 seen_tees: &mut SeenTees,
808 check_well_formed: bool,
809 ) {
810 self.transform_children(
811 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
812 seen_tees,
813 );
814
815 transform_root(self);
816 }
817
818 pub fn transform_children(
819 &mut self,
820 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
821 seen_tees: &mut SeenTees,
822 ) {
823 match self {
824 HydroRoot::ForEach { input, .. }
825 | HydroRoot::SendExternal { input, .. }
826 | HydroRoot::DestSink { input, .. }
827 | HydroRoot::CycleSink { input, .. } => {
828 transform(input, seen_tees);
829 }
830 }
831 }
832
833 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
834 match self {
835 HydroRoot::ForEach {
836 f,
837 input,
838 op_metadata,
839 } => HydroRoot::ForEach {
840 f: f.clone(),
841 input: Box::new(input.deep_clone(seen_tees)),
842 op_metadata: op_metadata.clone(),
843 },
844 HydroRoot::SendExternal {
845 to_external_id,
846 to_key,
847 to_many,
848 serialize_fn,
849 instantiate_fn,
850 input,
851 op_metadata,
852 } => HydroRoot::SendExternal {
853 to_external_id: *to_external_id,
854 to_key: *to_key,
855 to_many: *to_many,
856 serialize_fn: serialize_fn.clone(),
857 instantiate_fn: instantiate_fn.clone(),
858 input: Box::new(input.deep_clone(seen_tees)),
859 op_metadata: op_metadata.clone(),
860 },
861 HydroRoot::DestSink {
862 sink,
863 input,
864 op_metadata,
865 } => HydroRoot::DestSink {
866 sink: sink.clone(),
867 input: Box::new(input.deep_clone(seen_tees)),
868 op_metadata: op_metadata.clone(),
869 },
870 HydroRoot::CycleSink {
871 ident,
872 input,
873 op_metadata,
874 } => HydroRoot::CycleSink {
875 ident: ident.clone(),
876 input: Box::new(input.deep_clone(seen_tees)),
877 op_metadata: op_metadata.clone(),
878 },
879 }
880 }
881
882 #[cfg(feature = "build")]
883 pub fn emit(
884 &mut self,
885 graph_builders: &mut dyn DfirBuilder,
886 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
887 next_stmt_id: &mut usize,
888 ) {
889 self.emit_core(
890 &mut BuildersOrCallback::Builders::<
891 fn(&mut HydroRoot, &mut usize),
892 fn(&mut HydroNode, &mut usize),
893 >(graph_builders),
894 built_tees,
895 next_stmt_id,
896 );
897 }
898
899 #[cfg(feature = "build")]
900 pub fn emit_core(
901 &mut self,
902 builders_or_callback: &mut BuildersOrCallback<
903 impl FnMut(&mut HydroRoot, &mut usize),
904 impl FnMut(&mut HydroNode, &mut usize),
905 >,
906 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
907 next_stmt_id: &mut usize,
908 ) {
909 match self {
910 HydroRoot::ForEach { f, input, .. } => {
911 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
912
913 match builders_or_callback {
914 BuildersOrCallback::Builders(graph_builders) => {
915 graph_builders
916 .get_dfir_mut(&input.metadata().location_kind)
917 .add_dfir(
918 parse_quote! {
919 #input_ident -> for_each(#f);
920 },
921 None,
922 Some(&next_stmt_id.to_string()),
923 );
924 }
925 BuildersOrCallback::Callback(leaf_callback, _) => {
926 leaf_callback(self, next_stmt_id);
927 }
928 }
929
930 *next_stmt_id += 1;
931 }
932
933 HydroRoot::SendExternal {
934 serialize_fn,
935 instantiate_fn,
936 input,
937 ..
938 } => {
939 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
940
941 match builders_or_callback {
942 BuildersOrCallback::Builders(graph_builders) => {
943 let (sink_expr, _) = match instantiate_fn {
944 DebugInstantiate::Building => (
945 syn::parse_quote!(DUMMY_SINK),
946 syn::parse_quote!(DUMMY_SOURCE),
947 ),
948
949 DebugInstantiate::Finalized(finalized) => {
950 (finalized.sink.clone(), finalized.source.clone())
951 }
952 };
953
954 graph_builders.create_external_output(
955 &input.metadata().location_kind,
956 sink_expr,
957 &input_ident,
958 serialize_fn,
959 *next_stmt_id,
960 );
961 }
962 BuildersOrCallback::Callback(leaf_callback, _) => {
963 leaf_callback(self, next_stmt_id);
964 }
965 }
966
967 *next_stmt_id += 1;
968 }
969
970 HydroRoot::DestSink { sink, input, .. } => {
971 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
972
973 match builders_or_callback {
974 BuildersOrCallback::Builders(graph_builders) => {
975 graph_builders
976 .get_dfir_mut(&input.metadata().location_kind)
977 .add_dfir(
978 parse_quote! {
979 #input_ident -> dest_sink(#sink);
980 },
981 None,
982 Some(&next_stmt_id.to_string()),
983 );
984 }
985 BuildersOrCallback::Callback(leaf_callback, _) => {
986 leaf_callback(self, next_stmt_id);
987 }
988 }
989
990 *next_stmt_id += 1;
991 }
992
993 HydroRoot::CycleSink { ident, input, .. } => {
994 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
995
996 match builders_or_callback {
997 BuildersOrCallback::Builders(graph_builders) => {
998 graph_builders
999 .get_dfir_mut(&input.metadata().location_kind)
1000 .add_dfir(
1001 parse_quote! {
1002 #ident = #input_ident;
1003 },
1004 None,
1005 None,
1006 );
1007 }
1008 BuildersOrCallback::Callback(_, _) => {}
1010 }
1011 }
1012 }
1013 }
1014
1015 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1016 match self {
1017 HydroRoot::ForEach { op_metadata, .. }
1018 | HydroRoot::SendExternal { op_metadata, .. }
1019 | HydroRoot::DestSink { op_metadata, .. }
1020 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1021 }
1022 }
1023
1024 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1025 match self {
1026 HydroRoot::ForEach { op_metadata, .. }
1027 | HydroRoot::SendExternal { op_metadata, .. }
1028 | HydroRoot::DestSink { op_metadata, .. }
1029 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1030 }
1031 }
1032
1033 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
1034 match self {
1035 HydroRoot::ForEach { input, .. }
1036 | HydroRoot::SendExternal { input, .. }
1037 | HydroRoot::DestSink { input, .. }
1038 | HydroRoot::CycleSink { input, .. } => {
1039 vec![input.metadata()]
1040 }
1041 }
1042 }
1043
1044 pub fn print_root(&self) -> String {
1045 match self {
1046 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1047 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1048 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1049 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1050 }
1051 }
1052
1053 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1054 match self {
1055 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1056 transform(f);
1057 }
1058 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1059 }
1060 }
1061}
1062
1063#[cfg(feature = "build")]
1064pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1065 let mut builders = BTreeMap::new();
1066 let mut built_tees = HashMap::new();
1067 let mut next_stmt_id = 0;
1068 for leaf in ir {
1069 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1070 }
1071 builders
1072}
1073
1074#[cfg(feature = "build")]
1075pub fn traverse_dfir(
1076 ir: &mut [HydroRoot],
1077 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1078 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1079) {
1080 let mut seen_tees = HashMap::new();
1081 let mut next_stmt_id = 0;
1082 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1083 ir.iter_mut().for_each(|leaf| {
1084 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1085 });
1086}
1087
1088pub fn transform_bottom_up(
1089 ir: &mut [HydroRoot],
1090 transform_root: &mut impl FnMut(&mut HydroRoot),
1091 transform_node: &mut impl FnMut(&mut HydroNode),
1092 check_well_formed: bool,
1093) {
1094 let mut seen_tees = HashMap::new();
1095 ir.iter_mut().for_each(|leaf| {
1096 leaf.transform_bottom_up(
1097 transform_root,
1098 transform_node,
1099 &mut seen_tees,
1100 check_well_formed,
1101 );
1102 });
1103}
1104
1105pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1106 let mut seen_tees = HashMap::new();
1107 ir.iter()
1108 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1109 .collect()
1110}
1111
1112type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1113thread_local! {
1114 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1115}
1116
1117pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1118 PRINTED_TEES.with(|printed_tees| {
1119 let mut printed_tees_mut = printed_tees.borrow_mut();
1120 *printed_tees_mut = Some((0, HashMap::new()));
1121 drop(printed_tees_mut);
1122
1123 let ret = f();
1124
1125 let mut printed_tees_mut = printed_tees.borrow_mut();
1126 *printed_tees_mut = None;
1127
1128 ret
1129 })
1130}
1131
1132pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1133
1134impl TeeNode {
1135 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1136 Rc::as_ptr(&self.0)
1137 }
1138}
1139
1140impl Debug for TeeNode {
1141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1142 PRINTED_TEES.with(|printed_tees| {
1143 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1144 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1145
1146 if let Some(printed_tees_mut) = printed_tees_mut {
1147 if let Some(existing) = printed_tees_mut
1148 .1
1149 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1150 {
1151 write!(f, "<tee {}>", existing)
1152 } else {
1153 let next_id = printed_tees_mut.0;
1154 printed_tees_mut.0 += 1;
1155 printed_tees_mut
1156 .1
1157 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1158 drop(printed_tees_mut_borrow);
1159 write!(f, "<tee {}>: ", next_id)?;
1160 Debug::fmt(&self.0.borrow(), f)
1161 }
1162 } else {
1163 drop(printed_tees_mut_borrow);
1164 write!(f, "<tee>: ")?;
1165 Debug::fmt(&self.0.borrow(), f)
1166 }
1167 })
1168 }
1169}
1170
1171impl Hash for TeeNode {
1172 fn hash<H: Hasher>(&self, state: &mut H) {
1173 self.0.borrow_mut().hash(state);
1174 }
1175}
1176
1177#[derive(Clone, PartialEq, Eq, Debug)]
1178pub enum BoundKind {
1179 Unbounded,
1180 Bounded,
1181}
1182
1183#[derive(Clone, PartialEq, Eq, Debug)]
1184pub enum StreamOrder {
1185 NoOrder,
1186 TotalOrder,
1187}
1188
1189#[derive(Clone, PartialEq, Eq, Debug)]
1190pub enum StreamRetry {
1191 AtLeastOnce,
1192 ExactlyOnce,
1193}
1194
1195#[derive(Clone, PartialEq, Eq, Debug)]
1196pub enum KeyedSingletonBoundKind {
1197 Unbounded,
1198 BoundedValue,
1199 Bounded,
1200}
1201
1202#[derive(Clone, PartialEq, Eq, Debug)]
1203pub enum CollectionKind {
1204 Stream {
1205 bound: BoundKind,
1206 order: StreamOrder,
1207 retry: StreamRetry,
1208 element_type: DebugType,
1209 },
1210 Singleton {
1211 bound: BoundKind,
1212 element_type: DebugType,
1213 },
1214 Optional {
1215 bound: BoundKind,
1216 element_type: DebugType,
1217 },
1218 KeyedStream {
1219 bound: BoundKind,
1220 value_order: StreamOrder,
1221 value_retry: StreamRetry,
1222 key_type: DebugType,
1223 value_type: DebugType,
1224 },
1225 KeyedSingleton {
1226 bound: KeyedSingletonBoundKind,
1227 key_type: DebugType,
1228 value_type: DebugType,
1229 },
1230}
1231
1232#[derive(Clone)]
1233pub struct HydroIrMetadata {
1234 pub location_kind: LocationId,
1235 pub collection_kind: CollectionKind,
1236 pub cardinality: Option<usize>,
1237 pub tag: Option<String>,
1238 pub op: HydroIrOpMetadata,
1239}
1240
1241impl Hash for HydroIrMetadata {
1243 fn hash<H: Hasher>(&self, _: &mut H) {}
1244}
1245
1246impl PartialEq for HydroIrMetadata {
1247 fn eq(&self, _: &Self) -> bool {
1248 true
1249 }
1250}
1251
1252impl Eq for HydroIrMetadata {}
1253
1254impl Debug for HydroIrMetadata {
1255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1256 f.debug_struct("HydroIrMetadata")
1257 .field("location_kind", &self.location_kind)
1258 .field("collection_kind", &self.collection_kind)
1259 .finish()
1260 }
1261}
1262
1263#[derive(Clone)]
1266pub struct HydroIrOpMetadata {
1267 pub backtrace: Backtrace,
1268 pub cpu_usage: Option<f64>,
1269 pub network_recv_cpu_usage: Option<f64>,
1270 pub id: Option<usize>,
1271}
1272
1273impl HydroIrOpMetadata {
1274 #[expect(
1275 clippy::new_without_default,
1276 reason = "explicit calls to new ensure correct backtrace bounds"
1277 )]
1278 pub fn new() -> HydroIrOpMetadata {
1279 Self::new_with_skip(1)
1280 }
1281
1282 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1283 HydroIrOpMetadata {
1284 backtrace: Backtrace::get_backtrace(2 + skip_count),
1285 cpu_usage: None,
1286 network_recv_cpu_usage: None,
1287 id: None,
1288 }
1289 }
1290}
1291
1292impl Debug for HydroIrOpMetadata {
1293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294 f.debug_struct("HydroIrOpMetadata").finish()
1295 }
1296}
1297
1298impl Hash for HydroIrOpMetadata {
1299 fn hash<H: Hasher>(&self, _: &mut H) {}
1300}
1301
1302#[derive(Debug, Hash)]
1305pub enum HydroNode {
1306 Placeholder,
1307
1308 Cast {
1316 inner: Box<HydroNode>,
1317 metadata: HydroIrMetadata,
1318 },
1319
1320 ObserveNonDet {
1326 inner: Box<HydroNode>,
1327 metadata: HydroIrMetadata,
1328 },
1329
1330 Source {
1331 source: HydroSource,
1332 metadata: HydroIrMetadata,
1333 },
1334
1335 CycleSource {
1336 ident: syn::Ident,
1337 metadata: HydroIrMetadata,
1338 },
1339
1340 Tee {
1341 inner: TeeNode,
1342 metadata: HydroIrMetadata,
1343 },
1344
1345 Persist {
1346 inner: Box<HydroNode>,
1347 metadata: HydroIrMetadata,
1348 },
1349
1350 BeginAtomic {
1351 inner: Box<HydroNode>,
1352 metadata: HydroIrMetadata,
1353 },
1354
1355 EndAtomic {
1356 inner: Box<HydroNode>,
1357 metadata: HydroIrMetadata,
1358 },
1359
1360 Batch {
1361 inner: Box<HydroNode>,
1362 metadata: HydroIrMetadata,
1363 },
1364
1365 YieldConcat {
1366 inner: Box<HydroNode>,
1367 metadata: HydroIrMetadata,
1368 },
1369
1370 Chain {
1371 first: Box<HydroNode>,
1372 second: Box<HydroNode>,
1373 metadata: HydroIrMetadata,
1374 },
1375
1376 ChainFirst {
1377 first: Box<HydroNode>,
1378 second: Box<HydroNode>,
1379 metadata: HydroIrMetadata,
1380 },
1381
1382 CrossProduct {
1383 left: Box<HydroNode>,
1384 right: Box<HydroNode>,
1385 metadata: HydroIrMetadata,
1386 },
1387
1388 CrossSingleton {
1389 left: Box<HydroNode>,
1390 right: Box<HydroNode>,
1391 metadata: HydroIrMetadata,
1392 },
1393
1394 Join {
1395 left: Box<HydroNode>,
1396 right: Box<HydroNode>,
1397 metadata: HydroIrMetadata,
1398 },
1399
1400 Difference {
1401 pos: Box<HydroNode>,
1402 neg: Box<HydroNode>,
1403 metadata: HydroIrMetadata,
1404 },
1405
1406 AntiJoin {
1407 pos: Box<HydroNode>,
1408 neg: Box<HydroNode>,
1409 metadata: HydroIrMetadata,
1410 },
1411
1412 ResolveFutures {
1413 input: Box<HydroNode>,
1414 metadata: HydroIrMetadata,
1415 },
1416 ResolveFuturesOrdered {
1417 input: Box<HydroNode>,
1418 metadata: HydroIrMetadata,
1419 },
1420
1421 Map {
1422 f: DebugExpr,
1423 input: Box<HydroNode>,
1424 metadata: HydroIrMetadata,
1425 },
1426 FlatMap {
1427 f: DebugExpr,
1428 input: Box<HydroNode>,
1429 metadata: HydroIrMetadata,
1430 },
1431 Filter {
1432 f: DebugExpr,
1433 input: Box<HydroNode>,
1434 metadata: HydroIrMetadata,
1435 },
1436 FilterMap {
1437 f: DebugExpr,
1438 input: Box<HydroNode>,
1439 metadata: HydroIrMetadata,
1440 },
1441
1442 DeferTick {
1443 input: Box<HydroNode>,
1444 metadata: HydroIrMetadata,
1445 },
1446 Enumerate {
1447 input: Box<HydroNode>,
1448 metadata: HydroIrMetadata,
1449 },
1450 Inspect {
1451 f: DebugExpr,
1452 input: Box<HydroNode>,
1453 metadata: HydroIrMetadata,
1454 },
1455
1456 Unique {
1457 input: Box<HydroNode>,
1458 metadata: HydroIrMetadata,
1459 },
1460
1461 Sort {
1462 input: Box<HydroNode>,
1463 metadata: HydroIrMetadata,
1464 },
1465 Fold {
1466 init: DebugExpr,
1467 acc: DebugExpr,
1468 input: Box<HydroNode>,
1469 metadata: HydroIrMetadata,
1470 },
1471
1472 Scan {
1473 init: DebugExpr,
1474 acc: DebugExpr,
1475 input: Box<HydroNode>,
1476 metadata: HydroIrMetadata,
1477 },
1478 FoldKeyed {
1479 init: DebugExpr,
1480 acc: DebugExpr,
1481 input: Box<HydroNode>,
1482 metadata: HydroIrMetadata,
1483 },
1484
1485 Reduce {
1486 f: DebugExpr,
1487 input: Box<HydroNode>,
1488 metadata: HydroIrMetadata,
1489 },
1490 ReduceKeyed {
1491 f: DebugExpr,
1492 input: Box<HydroNode>,
1493 metadata: HydroIrMetadata,
1494 },
1495 ReduceKeyedWatermark {
1496 f: DebugExpr,
1497 input: Box<HydroNode>,
1498 watermark: Box<HydroNode>,
1499 metadata: HydroIrMetadata,
1500 },
1501
1502 Network {
1503 serialize_fn: Option<DebugExpr>,
1504 instantiate_fn: DebugInstantiate,
1505 deserialize_fn: Option<DebugExpr>,
1506 input: Box<HydroNode>,
1507 metadata: HydroIrMetadata,
1508 },
1509
1510 ExternalInput {
1511 from_external_id: usize,
1512 from_key: usize,
1513 from_many: bool,
1514 codec_type: DebugType,
1515 port_hint: NetworkHint,
1516 instantiate_fn: DebugInstantiate,
1517 deserialize_fn: Option<DebugExpr>,
1518 metadata: HydroIrMetadata,
1519 },
1520
1521 Counter {
1522 tag: String,
1523 duration: DebugExpr,
1524 prefix: String,
1525 input: Box<HydroNode>,
1526 metadata: HydroIrMetadata,
1527 },
1528}
1529
1530pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1531pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1532
1533impl HydroNode {
1534 pub fn transform_bottom_up(
1535 &mut self,
1536 transform: &mut impl FnMut(&mut HydroNode),
1537 seen_tees: &mut SeenTees,
1538 check_well_formed: bool,
1539 ) {
1540 self.transform_children(
1541 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1542 seen_tees,
1543 );
1544
1545 transform(self);
1546
1547 let self_location = self.metadata().location_kind.root();
1548
1549 if check_well_formed {
1550 match &*self {
1551 HydroNode::Network { .. } => {}
1552 _ => {
1553 self.input_metadata().iter().for_each(|i| {
1554 if i.location_kind.root() != self_location {
1555 panic!(
1556 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1557 i,
1558 i.location_kind.root(),
1559 self,
1560 self_location
1561 )
1562 }
1563 });
1564 }
1565 }
1566 }
1567 }
1568
1569 #[inline(always)]
1570 pub fn transform_children(
1571 &mut self,
1572 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1573 seen_tees: &mut SeenTees,
1574 ) {
1575 match self {
1576 HydroNode::Placeholder => {
1577 panic!();
1578 }
1579
1580 HydroNode::Source { .. }
1581 | HydroNode::CycleSource { .. }
1582 | HydroNode::ExternalInput { .. } => {}
1583
1584 HydroNode::Tee { inner, .. } => {
1585 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1586 *inner = TeeNode(transformed.clone());
1587 } else {
1588 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1589 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1590 let mut orig = inner.0.replace(HydroNode::Placeholder);
1591 transform(&mut orig, seen_tees);
1592 *transformed_cell.borrow_mut() = orig;
1593 *inner = TeeNode(transformed_cell);
1594 }
1595 }
1596
1597 HydroNode::Cast { inner, .. }
1598 | HydroNode::ObserveNonDet { inner, .. }
1599 | HydroNode::Persist { inner, .. }
1600 | HydroNode::BeginAtomic { inner, .. }
1601 | HydroNode::EndAtomic { inner, .. }
1602 | HydroNode::Batch { inner, .. }
1603 | HydroNode::YieldConcat { inner, .. } => {
1604 transform(inner.as_mut(), seen_tees);
1605 }
1606
1607 HydroNode::Chain { first, second, .. } => {
1608 transform(first.as_mut(), seen_tees);
1609 transform(second.as_mut(), seen_tees);
1610 }
1611
1612 HydroNode::ChainFirst { first, second, .. } => {
1613 transform(first.as_mut(), seen_tees);
1614 transform(second.as_mut(), seen_tees);
1615 }
1616
1617 HydroNode::CrossSingleton { left, right, .. }
1618 | HydroNode::CrossProduct { left, right, .. }
1619 | HydroNode::Join { left, right, .. } => {
1620 transform(left.as_mut(), seen_tees);
1621 transform(right.as_mut(), seen_tees);
1622 }
1623
1624 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1625 transform(pos.as_mut(), seen_tees);
1626 transform(neg.as_mut(), seen_tees);
1627 }
1628
1629 HydroNode::ReduceKeyedWatermark {
1630 input, watermark, ..
1631 } => {
1632 transform(input.as_mut(), seen_tees);
1633 transform(watermark.as_mut(), seen_tees);
1634 }
1635
1636 HydroNode::Map { input, .. }
1637 | HydroNode::ResolveFutures { input, .. }
1638 | HydroNode::ResolveFuturesOrdered { input, .. }
1639 | HydroNode::FlatMap { input, .. }
1640 | HydroNode::Filter { input, .. }
1641 | HydroNode::FilterMap { input, .. }
1642 | HydroNode::Sort { input, .. }
1643 | HydroNode::DeferTick { input, .. }
1644 | HydroNode::Enumerate { input, .. }
1645 | HydroNode::Inspect { input, .. }
1646 | HydroNode::Unique { input, .. }
1647 | HydroNode::Network { input, .. }
1648 | HydroNode::Fold { input, .. }
1649 | HydroNode::Scan { input, .. }
1650 | HydroNode::FoldKeyed { input, .. }
1651 | HydroNode::Reduce { input, .. }
1652 | HydroNode::ReduceKeyed { input, .. }
1653 | HydroNode::Counter { input, .. } => {
1654 transform(input.as_mut(), seen_tees);
1655 }
1656 }
1657 }
1658
1659 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1660 match self {
1661 HydroNode::Placeholder => HydroNode::Placeholder,
1662 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1663 inner: Box::new(inner.deep_clone(seen_tees)),
1664 metadata: metadata.clone(),
1665 },
1666 HydroNode::ObserveNonDet { inner, metadata } => HydroNode::ObserveNonDet {
1667 inner: Box::new(inner.deep_clone(seen_tees)),
1668 metadata: metadata.clone(),
1669 },
1670 HydroNode::Source { source, metadata } => HydroNode::Source {
1671 source: source.clone(),
1672 metadata: metadata.clone(),
1673 },
1674 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1675 ident: ident.clone(),
1676 metadata: metadata.clone(),
1677 },
1678 HydroNode::Tee { inner, metadata } => {
1679 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1680 HydroNode::Tee {
1681 inner: TeeNode(transformed.clone()),
1682 metadata: metadata.clone(),
1683 }
1684 } else {
1685 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1686 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1687 let cloned = inner.0.borrow().deep_clone(seen_tees);
1688 *new_rc.borrow_mut() = cloned;
1689 HydroNode::Tee {
1690 inner: TeeNode(new_rc),
1691 metadata: metadata.clone(),
1692 }
1693 }
1694 }
1695 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1696 inner: Box::new(inner.deep_clone(seen_tees)),
1697 metadata: metadata.clone(),
1698 },
1699 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1700 inner: Box::new(inner.deep_clone(seen_tees)),
1701 metadata: metadata.clone(),
1702 },
1703 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1704 inner: Box::new(inner.deep_clone(seen_tees)),
1705 metadata: metadata.clone(),
1706 },
1707 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1708 inner: Box::new(inner.deep_clone(seen_tees)),
1709 metadata: metadata.clone(),
1710 },
1711 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1712 inner: Box::new(inner.deep_clone(seen_tees)),
1713 metadata: metadata.clone(),
1714 },
1715 HydroNode::Chain {
1716 first,
1717 second,
1718 metadata,
1719 } => HydroNode::Chain {
1720 first: Box::new(first.deep_clone(seen_tees)),
1721 second: Box::new(second.deep_clone(seen_tees)),
1722 metadata: metadata.clone(),
1723 },
1724 HydroNode::ChainFirst {
1725 first,
1726 second,
1727 metadata,
1728 } => HydroNode::ChainFirst {
1729 first: Box::new(first.deep_clone(seen_tees)),
1730 second: Box::new(second.deep_clone(seen_tees)),
1731 metadata: metadata.clone(),
1732 },
1733 HydroNode::CrossProduct {
1734 left,
1735 right,
1736 metadata,
1737 } => HydroNode::CrossProduct {
1738 left: Box::new(left.deep_clone(seen_tees)),
1739 right: Box::new(right.deep_clone(seen_tees)),
1740 metadata: metadata.clone(),
1741 },
1742 HydroNode::CrossSingleton {
1743 left,
1744 right,
1745 metadata,
1746 } => HydroNode::CrossSingleton {
1747 left: Box::new(left.deep_clone(seen_tees)),
1748 right: Box::new(right.deep_clone(seen_tees)),
1749 metadata: metadata.clone(),
1750 },
1751 HydroNode::Join {
1752 left,
1753 right,
1754 metadata,
1755 } => HydroNode::Join {
1756 left: Box::new(left.deep_clone(seen_tees)),
1757 right: Box::new(right.deep_clone(seen_tees)),
1758 metadata: metadata.clone(),
1759 },
1760 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1761 pos: Box::new(pos.deep_clone(seen_tees)),
1762 neg: Box::new(neg.deep_clone(seen_tees)),
1763 metadata: metadata.clone(),
1764 },
1765 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1766 pos: Box::new(pos.deep_clone(seen_tees)),
1767 neg: Box::new(neg.deep_clone(seen_tees)),
1768 metadata: metadata.clone(),
1769 },
1770 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1771 input: Box::new(input.deep_clone(seen_tees)),
1772 metadata: metadata.clone(),
1773 },
1774 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1775 HydroNode::ResolveFuturesOrdered {
1776 input: Box::new(input.deep_clone(seen_tees)),
1777 metadata: metadata.clone(),
1778 }
1779 }
1780 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1781 f: f.clone(),
1782 input: Box::new(input.deep_clone(seen_tees)),
1783 metadata: metadata.clone(),
1784 },
1785 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1786 f: f.clone(),
1787 input: Box::new(input.deep_clone(seen_tees)),
1788 metadata: metadata.clone(),
1789 },
1790 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1791 f: f.clone(),
1792 input: Box::new(input.deep_clone(seen_tees)),
1793 metadata: metadata.clone(),
1794 },
1795 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1796 f: f.clone(),
1797 input: Box::new(input.deep_clone(seen_tees)),
1798 metadata: metadata.clone(),
1799 },
1800 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1801 input: Box::new(input.deep_clone(seen_tees)),
1802 metadata: metadata.clone(),
1803 },
1804 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1805 input: Box::new(input.deep_clone(seen_tees)),
1806 metadata: metadata.clone(),
1807 },
1808 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1809 f: f.clone(),
1810 input: Box::new(input.deep_clone(seen_tees)),
1811 metadata: metadata.clone(),
1812 },
1813 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1814 input: Box::new(input.deep_clone(seen_tees)),
1815 metadata: metadata.clone(),
1816 },
1817 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1818 input: Box::new(input.deep_clone(seen_tees)),
1819 metadata: metadata.clone(),
1820 },
1821 HydroNode::Fold {
1822 init,
1823 acc,
1824 input,
1825 metadata,
1826 } => HydroNode::Fold {
1827 init: init.clone(),
1828 acc: acc.clone(),
1829 input: Box::new(input.deep_clone(seen_tees)),
1830 metadata: metadata.clone(),
1831 },
1832 HydroNode::Scan {
1833 init,
1834 acc,
1835 input,
1836 metadata,
1837 } => HydroNode::Scan {
1838 init: init.clone(),
1839 acc: acc.clone(),
1840 input: Box::new(input.deep_clone(seen_tees)),
1841 metadata: metadata.clone(),
1842 },
1843 HydroNode::FoldKeyed {
1844 init,
1845 acc,
1846 input,
1847 metadata,
1848 } => HydroNode::FoldKeyed {
1849 init: init.clone(),
1850 acc: acc.clone(),
1851 input: Box::new(input.deep_clone(seen_tees)),
1852 metadata: metadata.clone(),
1853 },
1854 HydroNode::ReduceKeyedWatermark {
1855 f,
1856 input,
1857 watermark,
1858 metadata,
1859 } => HydroNode::ReduceKeyedWatermark {
1860 f: f.clone(),
1861 input: Box::new(input.deep_clone(seen_tees)),
1862 watermark: Box::new(watermark.deep_clone(seen_tees)),
1863 metadata: metadata.clone(),
1864 },
1865 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1866 f: f.clone(),
1867 input: Box::new(input.deep_clone(seen_tees)),
1868 metadata: metadata.clone(),
1869 },
1870 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1871 f: f.clone(),
1872 input: Box::new(input.deep_clone(seen_tees)),
1873 metadata: metadata.clone(),
1874 },
1875 HydroNode::Network {
1876 serialize_fn,
1877 instantiate_fn,
1878 deserialize_fn,
1879 input,
1880 metadata,
1881 } => HydroNode::Network {
1882 serialize_fn: serialize_fn.clone(),
1883 instantiate_fn: instantiate_fn.clone(),
1884 deserialize_fn: deserialize_fn.clone(),
1885 input: Box::new(input.deep_clone(seen_tees)),
1886 metadata: metadata.clone(),
1887 },
1888 HydroNode::ExternalInput {
1889 from_external_id,
1890 from_key,
1891 from_many,
1892 codec_type,
1893 port_hint,
1894 instantiate_fn,
1895 deserialize_fn,
1896 metadata,
1897 } => HydroNode::ExternalInput {
1898 from_external_id: *from_external_id,
1899 from_key: *from_key,
1900 from_many: *from_many,
1901 codec_type: codec_type.clone(),
1902 port_hint: *port_hint,
1903 instantiate_fn: instantiate_fn.clone(),
1904 deserialize_fn: deserialize_fn.clone(),
1905 metadata: metadata.clone(),
1906 },
1907 HydroNode::Counter {
1908 tag,
1909 duration,
1910 prefix,
1911 input,
1912 metadata,
1913 } => HydroNode::Counter {
1914 tag: tag.clone(),
1915 duration: duration.clone(),
1916 prefix: prefix.clone(),
1917 input: Box::new(input.deep_clone(seen_tees)),
1918 metadata: metadata.clone(),
1919 },
1920 }
1921 }
1922
1923 #[cfg(feature = "build")]
1924 pub fn emit_core(
1925 &mut self,
1926 builders_or_callback: &mut BuildersOrCallback<
1927 impl FnMut(&mut HydroRoot, &mut usize),
1928 impl FnMut(&mut HydroNode, &mut usize),
1929 >,
1930 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1931 next_stmt_id: &mut usize,
1932 ) -> syn::Ident {
1933 let out_location = self.metadata().location_kind.clone();
1934 match self {
1935 HydroNode::Placeholder => {
1936 panic!()
1937 }
1938
1939 HydroNode::Cast { inner, .. } => {
1940 inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1941 }
1942
1943 HydroNode::ObserveNonDet {
1944 inner, metadata, ..
1945 } => {
1946 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1947
1948 let observe_ident =
1949 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1950
1951 match builders_or_callback {
1952 BuildersOrCallback::Builders(graph_builders) => {
1953 graph_builders.observe_nondet(
1954 &inner.metadata().location_kind,
1955 inner_ident,
1956 &inner.metadata().collection_kind,
1957 &observe_ident,
1958 &metadata.collection_kind,
1959 );
1960 }
1961 BuildersOrCallback::Callback(_, node_callback) => {
1962 node_callback(self, next_stmt_id);
1963 }
1964 }
1965
1966 *next_stmt_id += 1;
1967
1968 observe_ident
1969 }
1970
1971 HydroNode::Persist { inner, .. } => {
1972 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1973
1974 let persist_ident =
1975 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1976
1977 match builders_or_callback {
1978 BuildersOrCallback::Builders(graph_builders) => {
1979 let builder = graph_builders.get_dfir_mut(&out_location);
1980 builder.add_dfir(
1981 parse_quote! {
1982 #persist_ident = #inner_ident -> persist::<'static>();
1983 },
1984 None,
1985 Some(&next_stmt_id.to_string()),
1986 );
1987 }
1988 BuildersOrCallback::Callback(_, node_callback) => {
1989 node_callback(self, next_stmt_id);
1990 }
1991 }
1992
1993 *next_stmt_id += 1;
1994
1995 persist_ident
1996 }
1997
1998 HydroNode::Batch {
1999 inner, metadata, ..
2000 } => {
2001 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2002
2003 let batch_ident =
2004 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2005
2006 match builders_or_callback {
2007 BuildersOrCallback::Builders(graph_builders) => {
2008 graph_builders.batch(
2009 inner_ident,
2010 &inner.metadata().location_kind,
2011 &inner.metadata().collection_kind,
2012 &batch_ident,
2013 &out_location,
2014 &metadata.op,
2015 );
2016 }
2017 BuildersOrCallback::Callback(_, node_callback) => {
2018 node_callback(self, next_stmt_id);
2019 }
2020 }
2021
2022 *next_stmt_id += 1;
2023
2024 batch_ident
2025 }
2026
2027 HydroNode::YieldConcat { inner, .. } => {
2028 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2029
2030 let yield_ident =
2031 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2032
2033 match builders_or_callback {
2034 BuildersOrCallback::Builders(graph_builders) => {
2035 graph_builders.yield_from_tick(
2036 inner_ident,
2037 &inner.metadata().location_kind,
2038 &inner.metadata().collection_kind,
2039 &yield_ident,
2040 );
2041 }
2042 BuildersOrCallback::Callback(_, node_callback) => {
2043 node_callback(self, next_stmt_id);
2044 }
2045 }
2046
2047 *next_stmt_id += 1;
2048
2049 yield_ident
2050 }
2051
2052 HydroNode::BeginAtomic { inner, .. } => {
2053 inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
2054 }
2055
2056 HydroNode::EndAtomic { inner, .. } => {
2057 inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
2058 }
2059
2060 HydroNode::Source {
2061 source, metadata, ..
2062 } => {
2063 if let HydroSource::ExternalNetwork() = source {
2064 syn::Ident::new("DUMMY", Span::call_site())
2065 } else {
2066 let source_ident =
2067 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2068
2069 let source_stmt = match source {
2070 HydroSource::Stream(expr) => {
2071 debug_assert!(metadata.location_kind.is_top_level());
2072 parse_quote! {
2073 #source_ident = source_stream(#expr);
2074 }
2075 }
2076
2077 HydroSource::ExternalNetwork() => {
2078 unreachable!()
2079 }
2080
2081 HydroSource::Iter(expr) => {
2082 if metadata.location_kind.is_top_level() {
2083 parse_quote! {
2084 #source_ident = source_iter(#expr);
2085 }
2086 } else {
2087 parse_quote! {
2089 #source_ident = source_iter(#expr) -> persist::<'static>();
2090 }
2091 }
2092 }
2093
2094 HydroSource::Spin() => {
2095 debug_assert!(metadata.location_kind.is_top_level());
2096 parse_quote! {
2097 #source_ident = spin();
2098 }
2099 }
2100 };
2101
2102 match builders_or_callback {
2103 BuildersOrCallback::Builders(graph_builders) => {
2104 let builder = graph_builders.get_dfir_mut(&out_location);
2105 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2106 }
2107 BuildersOrCallback::Callback(_, node_callback) => {
2108 node_callback(self, next_stmt_id);
2109 }
2110 }
2111
2112 *next_stmt_id += 1;
2113
2114 source_ident
2115 }
2116 }
2117
2118 HydroNode::CycleSource { ident, .. } => {
2119 let ident = ident.clone();
2120
2121 match builders_or_callback {
2122 BuildersOrCallback::Builders(_) => {}
2123 BuildersOrCallback::Callback(_, node_callback) => {
2124 node_callback(self, next_stmt_id);
2125 }
2126 }
2127
2128 *next_stmt_id += 1;
2130
2131 ident
2132 }
2133
2134 HydroNode::Tee { inner, .. } => {
2135 let ret_ident = if let Some(teed_from) =
2136 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2137 {
2138 match builders_or_callback {
2139 BuildersOrCallback::Builders(_) => {}
2140 BuildersOrCallback::Callback(_, node_callback) => {
2141 node_callback(self, next_stmt_id);
2142 }
2143 }
2144
2145 teed_from.clone()
2146 } else {
2147 let inner_ident = inner.0.borrow_mut().emit_core(
2148 builders_or_callback,
2149 built_tees,
2150 next_stmt_id,
2151 );
2152
2153 let tee_ident =
2154 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2155
2156 built_tees.insert(
2157 inner.0.as_ref() as *const RefCell<HydroNode>,
2158 tee_ident.clone(),
2159 );
2160
2161 match builders_or_callback {
2162 BuildersOrCallback::Builders(graph_builders) => {
2163 let builder = graph_builders.get_dfir_mut(&out_location);
2164 builder.add_dfir(
2165 parse_quote! {
2166 #tee_ident = #inner_ident -> tee();
2167 },
2168 None,
2169 Some(&next_stmt_id.to_string()),
2170 );
2171 }
2172 BuildersOrCallback::Callback(_, node_callback) => {
2173 node_callback(self, next_stmt_id);
2174 }
2175 }
2176
2177 tee_ident
2178 };
2179
2180 *next_stmt_id += 1;
2184 ret_ident
2185 }
2186
2187 HydroNode::Chain { first, second, .. } => {
2188 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2189 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2190
2191 let chain_ident =
2192 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2193
2194 match builders_or_callback {
2195 BuildersOrCallback::Builders(graph_builders) => {
2196 let builder = graph_builders.get_dfir_mut(&out_location);
2197 builder.add_dfir(
2198 parse_quote! {
2199 #chain_ident = chain();
2200 #first_ident -> [0]#chain_ident;
2201 #second_ident -> [1]#chain_ident;
2202 },
2203 None,
2204 Some(&next_stmt_id.to_string()),
2205 );
2206 }
2207 BuildersOrCallback::Callback(_, node_callback) => {
2208 node_callback(self, next_stmt_id);
2209 }
2210 }
2211
2212 *next_stmt_id += 1;
2213
2214 chain_ident
2215 }
2216
2217 HydroNode::ChainFirst { first, second, .. } => {
2218 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2219 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2220
2221 let chain_ident =
2222 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2223
2224 match builders_or_callback {
2225 BuildersOrCallback::Builders(graph_builders) => {
2226 let builder = graph_builders.get_dfir_mut(&out_location);
2227 builder.add_dfir(
2228 parse_quote! {
2229 #chain_ident = chain_first_n(1);
2230 #first_ident -> [0]#chain_ident;
2231 #second_ident -> [1]#chain_ident;
2232 },
2233 None,
2234 Some(&next_stmt_id.to_string()),
2235 );
2236 }
2237 BuildersOrCallback::Callback(_, node_callback) => {
2238 node_callback(self, next_stmt_id);
2239 }
2240 }
2241
2242 *next_stmt_id += 1;
2243
2244 chain_ident
2245 }
2246
2247 HydroNode::CrossSingleton { left, right, .. } => {
2248 let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2249 let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2250
2251 let cross_ident =
2252 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2253
2254 match builders_or_callback {
2255 BuildersOrCallback::Builders(graph_builders) => {
2256 let builder = graph_builders.get_dfir_mut(&out_location);
2257 builder.add_dfir(
2258 parse_quote! {
2259 #cross_ident = cross_singleton();
2260 #left_ident -> [input]#cross_ident;
2261 #right_ident -> [single]#cross_ident;
2262 },
2263 None,
2264 Some(&next_stmt_id.to_string()),
2265 );
2266 }
2267 BuildersOrCallback::Callback(_, node_callback) => {
2268 node_callback(self, next_stmt_id);
2269 }
2270 }
2271
2272 *next_stmt_id += 1;
2273
2274 cross_ident
2275 }
2276
2277 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2278 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2279 parse_quote!(cross_join_multiset)
2280 } else {
2281 parse_quote!(join_multiset)
2282 };
2283
2284 let (HydroNode::CrossProduct { left, right, .. }
2285 | HydroNode::Join { left, right, .. }) = self
2286 else {
2287 unreachable!()
2288 };
2289
2290 let is_top_level = left.metadata().location_kind.is_top_level()
2291 && right.metadata().location_kind.is_top_level();
2292 let (left_inner, left_lifetime) =
2293 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2294 debug_assert!(!left.metadata().location_kind.is_top_level());
2295 (left, quote!('static))
2296 } else if left.metadata().location_kind.is_top_level() {
2297 (left, quote!('static))
2298 } else {
2299 (left, quote!('tick))
2300 };
2301
2302 let (right_inner, right_lifetime) =
2303 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2304 debug_assert!(!right.metadata().location_kind.is_top_level());
2305 (right, quote!('static))
2306 } else if right.metadata().location_kind.is_top_level() {
2307 (right, quote!('static))
2308 } else {
2309 (right, quote!('tick))
2310 };
2311
2312 let left_ident =
2313 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2314 let right_ident =
2315 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2316
2317 let stream_ident =
2318 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2319
2320 match builders_or_callback {
2321 BuildersOrCallback::Builders(graph_builders) => {
2322 let builder = graph_builders.get_dfir_mut(&out_location);
2323 builder.add_dfir(
2324 if is_top_level {
2325 parse_quote! {
2328 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2329 #left_ident -> [0]#stream_ident;
2330 #right_ident -> [1]#stream_ident;
2331 }
2332 } else {
2333 parse_quote! {
2334 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2335 #left_ident -> [0]#stream_ident;
2336 #right_ident -> [1]#stream_ident;
2337 }
2338 }
2339 ,
2340 None,
2341 Some(&next_stmt_id.to_string()),
2342 );
2343 }
2344 BuildersOrCallback::Callback(_, node_callback) => {
2345 node_callback(self, next_stmt_id);
2346 }
2347 }
2348
2349 *next_stmt_id += 1;
2350
2351 stream_ident
2352 }
2353
2354 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2355 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2356 parse_quote!(difference_multiset)
2357 } else {
2358 parse_quote!(anti_join_multiset)
2359 };
2360
2361 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2362 self
2363 else {
2364 unreachable!()
2365 };
2366
2367 let (neg, neg_lifetime) =
2368 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2369 debug_assert!(!neg.metadata().location_kind.is_top_level());
2370 (neg, quote!('static))
2371 } else if neg.metadata().location_kind.is_top_level() {
2372 (neg, quote!('static))
2373 } else {
2374 (neg, quote!('tick))
2375 };
2376
2377 let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2378 let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2379
2380 let stream_ident =
2381 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2382
2383 match builders_or_callback {
2384 BuildersOrCallback::Builders(graph_builders) => {
2385 let builder = graph_builders.get_dfir_mut(&out_location);
2386 builder.add_dfir(
2387 parse_quote! {
2388 #stream_ident = #operator::<'tick, #neg_lifetime>();
2389 #pos_ident -> [pos]#stream_ident;
2390 #neg_ident -> [neg]#stream_ident;
2391 },
2392 None,
2393 Some(&next_stmt_id.to_string()),
2394 );
2395 }
2396 BuildersOrCallback::Callback(_, node_callback) => {
2397 node_callback(self, next_stmt_id);
2398 }
2399 }
2400
2401 *next_stmt_id += 1;
2402
2403 stream_ident
2404 }
2405
2406 HydroNode::ResolveFutures { input, .. } => {
2407 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2408
2409 let futures_ident =
2410 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2411
2412 match builders_or_callback {
2413 BuildersOrCallback::Builders(graph_builders) => {
2414 let builder = graph_builders.get_dfir_mut(&out_location);
2415 builder.add_dfir(
2416 parse_quote! {
2417 #futures_ident = #input_ident -> resolve_futures();
2418 },
2419 None,
2420 Some(&next_stmt_id.to_string()),
2421 );
2422 }
2423 BuildersOrCallback::Callback(_, node_callback) => {
2424 node_callback(self, next_stmt_id);
2425 }
2426 }
2427
2428 *next_stmt_id += 1;
2429
2430 futures_ident
2431 }
2432
2433 HydroNode::ResolveFuturesOrdered { input, .. } => {
2434 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2435
2436 let futures_ident =
2437 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2438
2439 match builders_or_callback {
2440 BuildersOrCallback::Builders(graph_builders) => {
2441 let builder = graph_builders.get_dfir_mut(&out_location);
2442 builder.add_dfir(
2443 parse_quote! {
2444 #futures_ident = #input_ident -> resolve_futures_ordered();
2445 },
2446 None,
2447 Some(&next_stmt_id.to_string()),
2448 );
2449 }
2450 BuildersOrCallback::Callback(_, node_callback) => {
2451 node_callback(self, next_stmt_id);
2452 }
2453 }
2454
2455 *next_stmt_id += 1;
2456
2457 futures_ident
2458 }
2459
2460 HydroNode::Map { f, input, .. } => {
2461 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2462
2463 let map_ident =
2464 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2465
2466 match builders_or_callback {
2467 BuildersOrCallback::Builders(graph_builders) => {
2468 let builder = graph_builders.get_dfir_mut(&out_location);
2469 builder.add_dfir(
2470 parse_quote! {
2471 #map_ident = #input_ident -> map(#f);
2472 },
2473 None,
2474 Some(&next_stmt_id.to_string()),
2475 );
2476 }
2477 BuildersOrCallback::Callback(_, node_callback) => {
2478 node_callback(self, next_stmt_id);
2479 }
2480 }
2481
2482 *next_stmt_id += 1;
2483
2484 map_ident
2485 }
2486
2487 HydroNode::FlatMap { f, input, .. } => {
2488 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2489
2490 let flat_map_ident =
2491 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2492
2493 match builders_or_callback {
2494 BuildersOrCallback::Builders(graph_builders) => {
2495 let builder = graph_builders.get_dfir_mut(&out_location);
2496 builder.add_dfir(
2497 parse_quote! {
2498 #flat_map_ident = #input_ident -> flat_map(#f);
2499 },
2500 None,
2501 Some(&next_stmt_id.to_string()),
2502 );
2503 }
2504 BuildersOrCallback::Callback(_, node_callback) => {
2505 node_callback(self, next_stmt_id);
2506 }
2507 }
2508
2509 *next_stmt_id += 1;
2510
2511 flat_map_ident
2512 }
2513
2514 HydroNode::Filter { f, input, .. } => {
2515 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2516
2517 let filter_ident =
2518 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2519
2520 match builders_or_callback {
2521 BuildersOrCallback::Builders(graph_builders) => {
2522 let builder = graph_builders.get_dfir_mut(&out_location);
2523 builder.add_dfir(
2524 parse_quote! {
2525 #filter_ident = #input_ident -> filter(#f);
2526 },
2527 None,
2528 Some(&next_stmt_id.to_string()),
2529 );
2530 }
2531 BuildersOrCallback::Callback(_, node_callback) => {
2532 node_callback(self, next_stmt_id);
2533 }
2534 }
2535
2536 *next_stmt_id += 1;
2537
2538 filter_ident
2539 }
2540
2541 HydroNode::FilterMap { f, input, .. } => {
2542 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2543
2544 let filter_map_ident =
2545 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2546
2547 match builders_or_callback {
2548 BuildersOrCallback::Builders(graph_builders) => {
2549 let builder = graph_builders.get_dfir_mut(&out_location);
2550 builder.add_dfir(
2551 parse_quote! {
2552 #filter_map_ident = #input_ident -> filter_map(#f);
2553 },
2554 None,
2555 Some(&next_stmt_id.to_string()),
2556 );
2557 }
2558 BuildersOrCallback::Callback(_, node_callback) => {
2559 node_callback(self, next_stmt_id);
2560 }
2561 }
2562
2563 *next_stmt_id += 1;
2564
2565 filter_map_ident
2566 }
2567
2568 HydroNode::Sort { input, .. } => {
2569 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2570
2571 let sort_ident =
2572 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2573
2574 match builders_or_callback {
2575 BuildersOrCallback::Builders(graph_builders) => {
2576 let builder = graph_builders.get_dfir_mut(&out_location);
2577 builder.add_dfir(
2578 parse_quote! {
2579 #sort_ident = #input_ident -> sort();
2580 },
2581 None,
2582 Some(&next_stmt_id.to_string()),
2583 );
2584 }
2585 BuildersOrCallback::Callback(_, node_callback) => {
2586 node_callback(self, next_stmt_id);
2587 }
2588 }
2589
2590 *next_stmt_id += 1;
2591
2592 sort_ident
2593 }
2594
2595 HydroNode::DeferTick { input, .. } => {
2596 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2597
2598 let defer_tick_ident =
2599 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601 match builders_or_callback {
2602 BuildersOrCallback::Builders(graph_builders) => {
2603 let builder = graph_builders.get_dfir_mut(&out_location);
2604 builder.add_dfir(
2605 parse_quote! {
2606 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2607 },
2608 None,
2609 Some(&next_stmt_id.to_string()),
2610 );
2611 }
2612 BuildersOrCallback::Callback(_, node_callback) => {
2613 node_callback(self, next_stmt_id);
2614 }
2615 }
2616
2617 *next_stmt_id += 1;
2618
2619 defer_tick_ident
2620 }
2621
2622 HydroNode::Enumerate { input, .. } => {
2623 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2624
2625 let enumerate_ident =
2626 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2627
2628 match builders_or_callback {
2629 BuildersOrCallback::Builders(graph_builders) => {
2630 let builder = graph_builders.get_dfir_mut(&out_location);
2631 let lifetime = if input.metadata().location_kind.is_top_level() {
2632 quote!('static)
2633 } else {
2634 quote!('tick)
2635 };
2636 builder.add_dfir(
2637 parse_quote! {
2638 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2639 },
2640 None,
2641 Some(&next_stmt_id.to_string()),
2642 );
2643 }
2644 BuildersOrCallback::Callback(_, node_callback) => {
2645 node_callback(self, next_stmt_id);
2646 }
2647 }
2648
2649 *next_stmt_id += 1;
2650
2651 enumerate_ident
2652 }
2653
2654 HydroNode::Inspect { f, input, .. } => {
2655 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2656
2657 let inspect_ident =
2658 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2659
2660 match builders_or_callback {
2661 BuildersOrCallback::Builders(graph_builders) => {
2662 let builder = graph_builders.get_dfir_mut(&out_location);
2663 builder.add_dfir(
2664 parse_quote! {
2665 #inspect_ident = #input_ident -> inspect(#f);
2666 },
2667 None,
2668 Some(&next_stmt_id.to_string()),
2669 );
2670 }
2671 BuildersOrCallback::Callback(_, node_callback) => {
2672 node_callback(self, next_stmt_id);
2673 }
2674 }
2675
2676 *next_stmt_id += 1;
2677
2678 inspect_ident
2679 }
2680
2681 HydroNode::Unique { input, .. } => {
2682 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2683
2684 let unique_ident =
2685 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2686
2687 match builders_or_callback {
2688 BuildersOrCallback::Builders(graph_builders) => {
2689 let builder = graph_builders.get_dfir_mut(&out_location);
2690 let lifetime = if input.metadata().location_kind.is_top_level() {
2691 quote!('static)
2692 } else {
2693 quote!('tick)
2694 };
2695
2696 builder.add_dfir(
2697 parse_quote! {
2698 #unique_ident = #input_ident -> unique::<#lifetime>();
2699 },
2700 None,
2701 Some(&next_stmt_id.to_string()),
2702 );
2703 }
2704 BuildersOrCallback::Callback(_, node_callback) => {
2705 node_callback(self, next_stmt_id);
2706 }
2707 }
2708
2709 *next_stmt_id += 1;
2710
2711 unique_ident
2712 }
2713
2714 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2715 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2716 parse_quote!(fold)
2717 } else if matches!(self, HydroNode::Scan { .. }) {
2718 parse_quote!(scan)
2719 } else {
2720 parse_quote!(fold_keyed)
2721 };
2722
2723 let (HydroNode::Fold {
2724 init, acc, input, ..
2725 }
2726 | HydroNode::FoldKeyed {
2727 init, acc, input, ..
2728 }
2729 | HydroNode::Scan {
2730 init, acc, input, ..
2731 }) = self
2732 else {
2733 unreachable!()
2734 };
2735
2736 let (input, lifetime) =
2737 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2738 debug_assert!(!input.metadata().location_kind.is_top_level());
2739 (input, quote!('static))
2740 } else if input.metadata().location_kind.is_top_level() {
2741 (input, quote!('static))
2742 } else {
2743 (input, quote!('tick))
2744 };
2745
2746 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2747
2748 let fold_ident =
2749 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2750
2751 match builders_or_callback {
2752 BuildersOrCallback::Builders(graph_builders) => {
2753 let builder = graph_builders.get_dfir_mut(&out_location);
2754 builder.add_dfir(
2755 parse_quote! {
2756 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2757 },
2758 None,
2759 Some(&next_stmt_id.to_string()),
2760 );
2761 }
2762 BuildersOrCallback::Callback(_, node_callback) => {
2763 node_callback(self, next_stmt_id);
2764 }
2765 }
2766
2767 *next_stmt_id += 1;
2768
2769 fold_ident
2770 }
2771
2772 HydroNode::ReduceKeyedWatermark {
2773 f,
2774 input,
2775 watermark,
2776 ..
2777 } => {
2778 let (input, lifetime) =
2779 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2780 debug_assert!(!input.metadata().location_kind.is_top_level());
2781 (input, quote!('static))
2782 } else if input.metadata().location_kind.is_top_level() {
2783 (input, quote!('static))
2784 } else {
2785 (input, quote!('tick))
2786 };
2787
2788 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2789
2790 let watermark_ident =
2791 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2792
2793 let chain_ident = syn::Ident::new(
2794 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2795 Span::call_site(),
2796 );
2797
2798 let fold_ident =
2799 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2800
2801 match builders_or_callback {
2802 BuildersOrCallback::Builders(graph_builders) => {
2803 let builder = graph_builders.get_dfir_mut(&out_location);
2804 builder.add_dfir(
2809 parse_quote! {
2810 #chain_ident = chain();
2811 #input_ident
2812 -> map(|x| (Some(x), None))
2813 -> [0]#chain_ident;
2814 #watermark_ident
2815 -> map(|watermark| (None, Some(watermark)))
2816 -> [1]#chain_ident;
2817
2818 #fold_ident = #chain_ident
2819 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2820 let __reduce_keyed_fn = #f;
2821 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2822 if let Some((k, v)) = opt_payload {
2823 if let Some(curr_watermark) = *opt_curr_watermark {
2824 if k <= curr_watermark {
2825 return;
2826 }
2827 }
2828 match map.entry(k) {
2829 ::std::collections::hash_map::Entry::Vacant(e) => {
2830 e.insert(v);
2831 }
2832 ::std::collections::hash_map::Entry::Occupied(mut e) => {
2833 __reduce_keyed_fn(e.get_mut(), v);
2834 }
2835 }
2836 } else {
2837 let watermark = opt_watermark.unwrap();
2838 if let Some(curr_watermark) = *opt_curr_watermark {
2839 if watermark <= curr_watermark {
2840 return;
2841 }
2842 }
2843 *opt_curr_watermark = opt_watermark;
2844 map.retain(|k, _| *k > watermark);
2845 }
2846 }
2847 })
2848 -> flat_map(|(map, _curr_watermark)| map);
2849 },
2850 None,
2851 Some(&next_stmt_id.to_string()),
2852 );
2853 }
2854 BuildersOrCallback::Callback(_, node_callback) => {
2855 node_callback(self, next_stmt_id);
2856 }
2857 }
2858
2859 *next_stmt_id += 1;
2860
2861 fold_ident
2862 }
2863
2864 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2865 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2866 parse_quote!(reduce)
2867 } else {
2868 parse_quote!(reduce_keyed)
2869 };
2870
2871 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2872 self
2873 else {
2874 unreachable!()
2875 };
2876
2877 let (input, lifetime) =
2878 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2879 debug_assert!(!input.metadata().location_kind.is_top_level());
2880 (input, quote!('static))
2881 } else if input.metadata().location_kind.is_top_level() {
2882 (input, quote!('static))
2883 } else {
2884 (input, quote!('tick))
2885 };
2886
2887 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2888
2889 let reduce_ident =
2890 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2891
2892 match builders_or_callback {
2893 BuildersOrCallback::Builders(graph_builders) => {
2894 let builder = graph_builders.get_dfir_mut(&out_location);
2895 builder.add_dfir(
2896 parse_quote! {
2897 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2898 },
2899 None,
2900 Some(&next_stmt_id.to_string()),
2901 );
2902 }
2903 BuildersOrCallback::Callback(_, node_callback) => {
2904 node_callback(self, next_stmt_id);
2905 }
2906 }
2907
2908 *next_stmt_id += 1;
2909
2910 reduce_ident
2911 }
2912
2913 HydroNode::Network {
2914 serialize_fn: serialize_pipeline,
2915 instantiate_fn,
2916 deserialize_fn: deserialize_pipeline,
2917 input,
2918 ..
2919 } => {
2920 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2921
2922 let receiver_stream_ident =
2923 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2924
2925 match builders_or_callback {
2926 BuildersOrCallback::Builders(graph_builders) => {
2927 let (sink_expr, source_expr) = match instantiate_fn {
2928 DebugInstantiate::Building => (
2929 syn::parse_quote!(DUMMY_SINK),
2930 syn::parse_quote!(DUMMY_SOURCE),
2931 ),
2932
2933 DebugInstantiate::Finalized(finalized) => {
2934 (finalized.sink.clone(), finalized.source.clone())
2935 }
2936 };
2937
2938 graph_builders.create_network(
2939 &input.metadata().location_kind,
2940 &out_location,
2941 input_ident,
2942 &receiver_stream_ident,
2943 serialize_pipeline,
2944 sink_expr,
2945 source_expr,
2946 deserialize_pipeline,
2947 *next_stmt_id,
2948 );
2949 }
2950 BuildersOrCallback::Callback(_, node_callback) => {
2951 node_callback(self, next_stmt_id);
2952 }
2953 }
2954
2955 *next_stmt_id += 1;
2956
2957 receiver_stream_ident
2958 }
2959
2960 HydroNode::ExternalInput {
2961 instantiate_fn,
2962 deserialize_fn: deserialize_pipeline,
2963 ..
2964 } => {
2965 let receiver_stream_ident =
2966 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2967
2968 match builders_or_callback {
2969 BuildersOrCallback::Builders(graph_builders) => {
2970 let (_, source_expr) = match instantiate_fn {
2971 DebugInstantiate::Building => (
2972 syn::parse_quote!(DUMMY_SINK),
2973 syn::parse_quote!(DUMMY_SOURCE),
2974 ),
2975
2976 DebugInstantiate::Finalized(finalized) => {
2977 (finalized.sink.clone(), finalized.source.clone())
2978 }
2979 };
2980
2981 graph_builders.create_external_source(
2982 &out_location,
2983 source_expr,
2984 &receiver_stream_ident,
2985 deserialize_pipeline,
2986 *next_stmt_id,
2987 );
2988 }
2989 BuildersOrCallback::Callback(_, node_callback) => {
2990 node_callback(self, next_stmt_id);
2991 }
2992 }
2993
2994 *next_stmt_id += 1;
2995
2996 receiver_stream_ident
2997 }
2998
2999 HydroNode::Counter {
3000 tag,
3001 duration,
3002 prefix,
3003 input,
3004 ..
3005 } => {
3006 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3007
3008 let counter_ident =
3009 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3010
3011 match builders_or_callback {
3012 BuildersOrCallback::Builders(graph_builders) => {
3013 let builder = graph_builders.get_dfir_mut(&out_location);
3014 builder.add_dfir(
3015 parse_quote! {
3016 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3017 },
3018 None,
3019 Some(&next_stmt_id.to_string()),
3020 );
3021 }
3022 BuildersOrCallback::Callback(_, node_callback) => {
3023 node_callback(self, next_stmt_id);
3024 }
3025 }
3026
3027 *next_stmt_id += 1;
3028
3029 counter_ident
3030 }
3031 }
3032 }
3033
3034 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3035 match self {
3036 HydroNode::Placeholder => {
3037 panic!()
3038 }
3039 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3040 HydroNode::Source { source, .. } => match source {
3041 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3042 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3043 },
3044 HydroNode::CycleSource { .. }
3045 | HydroNode::Tee { .. }
3046 | HydroNode::Persist { .. }
3047 | HydroNode::YieldConcat { .. }
3048 | HydroNode::BeginAtomic { .. }
3049 | HydroNode::EndAtomic { .. }
3050 | HydroNode::Batch { .. }
3051 | HydroNode::Chain { .. }
3052 | HydroNode::ChainFirst { .. }
3053 | HydroNode::CrossProduct { .. }
3054 | HydroNode::CrossSingleton { .. }
3055 | HydroNode::ResolveFutures { .. }
3056 | HydroNode::ResolveFuturesOrdered { .. }
3057 | HydroNode::Join { .. }
3058 | HydroNode::Difference { .. }
3059 | HydroNode::AntiJoin { .. }
3060 | HydroNode::DeferTick { .. }
3061 | HydroNode::Enumerate { .. }
3062 | HydroNode::Unique { .. }
3063 | HydroNode::Sort { .. } => {}
3064 HydroNode::Map { f, .. }
3065 | HydroNode::FlatMap { f, .. }
3066 | HydroNode::Filter { f, .. }
3067 | HydroNode::FilterMap { f, .. }
3068 | HydroNode::Inspect { f, .. }
3069 | HydroNode::Reduce { f, .. }
3070 | HydroNode::ReduceKeyed { f, .. }
3071 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3072 transform(f);
3073 }
3074 HydroNode::Fold { init, acc, .. }
3075 | HydroNode::Scan { init, acc, .. }
3076 | HydroNode::FoldKeyed { init, acc, .. } => {
3077 transform(init);
3078 transform(acc);
3079 }
3080 HydroNode::Network {
3081 serialize_fn,
3082 deserialize_fn,
3083 ..
3084 } => {
3085 if let Some(serialize_fn) = serialize_fn {
3086 transform(serialize_fn);
3087 }
3088 if let Some(deserialize_fn) = deserialize_fn {
3089 transform(deserialize_fn);
3090 }
3091 }
3092 HydroNode::ExternalInput { deserialize_fn, .. } => {
3093 if let Some(deserialize_fn) = deserialize_fn {
3094 transform(deserialize_fn);
3095 }
3096 }
3097 HydroNode::Counter { duration, .. } => {
3098 transform(duration);
3099 }
3100 }
3101 }
3102
3103 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3104 &self.metadata().op
3105 }
3106
3107 pub fn metadata(&self) -> &HydroIrMetadata {
3108 match self {
3109 HydroNode::Placeholder => {
3110 panic!()
3111 }
3112 HydroNode::Cast { metadata, .. } => metadata,
3113 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3114 HydroNode::Source { metadata, .. } => metadata,
3115 HydroNode::CycleSource { metadata, .. } => metadata,
3116 HydroNode::Tee { metadata, .. } => metadata,
3117 HydroNode::Persist { metadata, .. } => metadata,
3118 HydroNode::YieldConcat { metadata, .. } => metadata,
3119 HydroNode::BeginAtomic { metadata, .. } => metadata,
3120 HydroNode::EndAtomic { metadata, .. } => metadata,
3121 HydroNode::Batch { metadata, .. } => metadata,
3122 HydroNode::Chain { metadata, .. } => metadata,
3123 HydroNode::ChainFirst { metadata, .. } => metadata,
3124 HydroNode::CrossProduct { metadata, .. } => metadata,
3125 HydroNode::CrossSingleton { metadata, .. } => metadata,
3126 HydroNode::Join { metadata, .. } => metadata,
3127 HydroNode::Difference { metadata, .. } => metadata,
3128 HydroNode::AntiJoin { metadata, .. } => metadata,
3129 HydroNode::ResolveFutures { metadata, .. } => metadata,
3130 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3131 HydroNode::Map { metadata, .. } => metadata,
3132 HydroNode::FlatMap { metadata, .. } => metadata,
3133 HydroNode::Filter { metadata, .. } => metadata,
3134 HydroNode::FilterMap { metadata, .. } => metadata,
3135 HydroNode::DeferTick { metadata, .. } => metadata,
3136 HydroNode::Enumerate { metadata, .. } => metadata,
3137 HydroNode::Inspect { metadata, .. } => metadata,
3138 HydroNode::Unique { metadata, .. } => metadata,
3139 HydroNode::Sort { metadata, .. } => metadata,
3140 HydroNode::Scan { metadata, .. } => metadata,
3141 HydroNode::Fold { metadata, .. } => metadata,
3142 HydroNode::FoldKeyed { metadata, .. } => metadata,
3143 HydroNode::Reduce { metadata, .. } => metadata,
3144 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3145 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3146 HydroNode::ExternalInput { metadata, .. } => metadata,
3147 HydroNode::Network { metadata, .. } => metadata,
3148 HydroNode::Counter { metadata, .. } => metadata,
3149 }
3150 }
3151
3152 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3153 &mut self.metadata_mut().op
3154 }
3155
3156 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3157 match self {
3158 HydroNode::Placeholder => {
3159 panic!()
3160 }
3161 HydroNode::Cast { metadata, .. } => metadata,
3162 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3163 HydroNode::Source { metadata, .. } => metadata,
3164 HydroNode::CycleSource { metadata, .. } => metadata,
3165 HydroNode::Tee { metadata, .. } => metadata,
3166 HydroNode::Persist { metadata, .. } => metadata,
3167 HydroNode::YieldConcat { metadata, .. } => metadata,
3168 HydroNode::BeginAtomic { metadata, .. } => metadata,
3169 HydroNode::EndAtomic { metadata, .. } => metadata,
3170 HydroNode::Batch { metadata, .. } => metadata,
3171 HydroNode::Chain { metadata, .. } => metadata,
3172 HydroNode::ChainFirst { metadata, .. } => metadata,
3173 HydroNode::CrossProduct { metadata, .. } => metadata,
3174 HydroNode::CrossSingleton { metadata, .. } => metadata,
3175 HydroNode::Join { metadata, .. } => metadata,
3176 HydroNode::Difference { metadata, .. } => metadata,
3177 HydroNode::AntiJoin { metadata, .. } => metadata,
3178 HydroNode::ResolveFutures { metadata, .. } => metadata,
3179 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3180 HydroNode::Map { metadata, .. } => metadata,
3181 HydroNode::FlatMap { metadata, .. } => metadata,
3182 HydroNode::Filter { metadata, .. } => metadata,
3183 HydroNode::FilterMap { metadata, .. } => metadata,
3184 HydroNode::DeferTick { metadata, .. } => metadata,
3185 HydroNode::Enumerate { metadata, .. } => metadata,
3186 HydroNode::Inspect { metadata, .. } => metadata,
3187 HydroNode::Unique { metadata, .. } => metadata,
3188 HydroNode::Sort { metadata, .. } => metadata,
3189 HydroNode::Scan { metadata, .. } => metadata,
3190 HydroNode::Fold { metadata, .. } => metadata,
3191 HydroNode::FoldKeyed { metadata, .. } => metadata,
3192 HydroNode::Reduce { metadata, .. } => metadata,
3193 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3194 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3195 HydroNode::ExternalInput { metadata, .. } => metadata,
3196 HydroNode::Network { metadata, .. } => metadata,
3197 HydroNode::Counter { metadata, .. } => metadata,
3198 }
3199 }
3200
3201 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3202 match self {
3203 HydroNode::Placeholder => {
3204 panic!()
3205 }
3206 HydroNode::Source { .. }
3207 | HydroNode::ExternalInput { .. }
3208 | HydroNode::CycleSource { .. } | HydroNode::Tee { .. } => {
3210 vec![]
3211 }
3212 HydroNode::Cast { inner, .. }
3213 | HydroNode::ObserveNonDet { inner, .. }
3214 | HydroNode::Persist { inner, .. }
3215 | HydroNode::YieldConcat { inner, .. }
3216 | HydroNode::BeginAtomic { inner, .. }
3217 | HydroNode::EndAtomic { inner, .. }
3218 | HydroNode::Batch { inner, .. } => {
3219 vec![inner.metadata()]
3220 }
3221 HydroNode::Chain { first, second, .. } => {
3222 vec![first.metadata(), second.metadata()]
3223 }
3224 HydroNode::ChainFirst { first, second, .. } => {
3225 vec![first.metadata(), second.metadata()]
3226 }
3227 HydroNode::CrossProduct { left, right, .. }
3228 | HydroNode::CrossSingleton { left, right, .. }
3229 | HydroNode::Join { left, right, .. } => {
3230 vec![left.metadata(), right.metadata()]
3231 }
3232 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3233 vec![pos.metadata(), neg.metadata()]
3234 }
3235 HydroNode::Map { input, .. }
3236 | HydroNode::FlatMap { input, .. }
3237 | HydroNode::Filter { input, .. }
3238 | HydroNode::FilterMap { input, .. }
3239 | HydroNode::Sort { input, .. }
3240 | HydroNode::DeferTick { input, .. }
3241 | HydroNode::Enumerate { input, .. }
3242 | HydroNode::Inspect { input, .. }
3243 | HydroNode::Unique { input, .. }
3244 | HydroNode::Network { input, .. }
3245 | HydroNode::Counter { input, .. }
3246 | HydroNode::ResolveFutures { input, .. }
3247 | HydroNode::ResolveFuturesOrdered { input, .. } => {
3248 vec![input.metadata()]
3249 }
3250 HydroNode::Fold { input, .. }
3251 | HydroNode::FoldKeyed { input, .. }
3252 | HydroNode::Reduce { input, .. }
3253 | HydroNode::ReduceKeyed { input, .. }
3254 | HydroNode::Scan { input, .. } => {
3255 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3257 vec![inner.metadata()]
3258 } else {
3259 vec![input.metadata()]
3260 }
3261 }
3262 HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
3263 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3265 vec![inner.metadata(), watermark.metadata()]
3266 } else {
3267 vec![input.metadata(), watermark.metadata()]
3268 }
3269 }
3270 }
3271 }
3272
3273 pub fn print_root(&self) -> String {
3274 match self {
3275 HydroNode::Placeholder => {
3276 panic!()
3277 }
3278 HydroNode::Cast { .. } => "Cast()".to_string(),
3279 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3280 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3281 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3282 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3283 HydroNode::Persist { .. } => "Persist()".to_string(),
3284 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3285 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3286 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3287 HydroNode::Batch { .. } => "Batch()".to_string(),
3288 HydroNode::Chain { first, second, .. } => {
3289 format!("Chain({}, {})", first.print_root(), second.print_root())
3290 }
3291 HydroNode::ChainFirst { first, second, .. } => {
3292 format!(
3293 "ChainFirst({}, {})",
3294 first.print_root(),
3295 second.print_root()
3296 )
3297 }
3298 HydroNode::CrossProduct { left, right, .. } => {
3299 format!(
3300 "CrossProduct({}, {})",
3301 left.print_root(),
3302 right.print_root()
3303 )
3304 }
3305 HydroNode::CrossSingleton { left, right, .. } => {
3306 format!(
3307 "CrossSingleton({}, {})",
3308 left.print_root(),
3309 right.print_root()
3310 )
3311 }
3312 HydroNode::Join { left, right, .. } => {
3313 format!("Join({}, {})", left.print_root(), right.print_root())
3314 }
3315 HydroNode::Difference { pos, neg, .. } => {
3316 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3317 }
3318 HydroNode::AntiJoin { pos, neg, .. } => {
3319 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3320 }
3321 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3322 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3323 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3324 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3325 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3326 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3327 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3328 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3329 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3330 HydroNode::Unique { .. } => "Unique()".to_string(),
3331 HydroNode::Sort { .. } => "Sort()".to_string(),
3332 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3333 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3334 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3335 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3336 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3337 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3338 HydroNode::Network { .. } => "Network()".to_string(),
3339 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3340 HydroNode::Counter { tag, duration, .. } => {
3341 format!("Counter({:?}, {:?})", tag, duration)
3342 }
3343 }
3344 }
3345}
3346
3347#[cfg(feature = "build")]
3348fn instantiate_network<'a, D>(
3349 from_location: &LocationId,
3350 to_location: &LocationId,
3351 processes: &HashMap<usize, D::Process>,
3352 clusters: &HashMap<usize, D::Cluster>,
3353 compile_env: &D::CompileEnv,
3354) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3355where
3356 D: Deploy<'a>,
3357{
3358 let ((sink, source), connect_fn) = match (from_location, to_location) {
3359 (LocationId::Process(from), LocationId::Process(to)) => {
3360 let from_node = processes
3361 .get(from)
3362 .unwrap_or_else(|| {
3363 panic!("A process used in the graph was not instantiated: {}", from)
3364 })
3365 .clone();
3366 let to_node = processes
3367 .get(to)
3368 .unwrap_or_else(|| {
3369 panic!("A process used in the graph was not instantiated: {}", to)
3370 })
3371 .clone();
3372
3373 let sink_port = D::allocate_process_port(&from_node);
3374 let source_port = D::allocate_process_port(&to_node);
3375
3376 (
3377 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3378 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3379 )
3380 }
3381 (LocationId::Process(from), LocationId::Cluster(to)) => {
3382 let from_node = processes
3383 .get(from)
3384 .unwrap_or_else(|| {
3385 panic!("A process used in the graph was not instantiated: {}", from)
3386 })
3387 .clone();
3388 let to_node = clusters
3389 .get(to)
3390 .unwrap_or_else(|| {
3391 panic!("A cluster used in the graph was not instantiated: {}", to)
3392 })
3393 .clone();
3394
3395 let sink_port = D::allocate_process_port(&from_node);
3396 let source_port = D::allocate_cluster_port(&to_node);
3397
3398 (
3399 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3400 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3401 )
3402 }
3403 (LocationId::Cluster(from), LocationId::Process(to)) => {
3404 let from_node = clusters
3405 .get(from)
3406 .unwrap_or_else(|| {
3407 panic!("A cluster used in the graph was not instantiated: {}", from)
3408 })
3409 .clone();
3410 let to_node = processes
3411 .get(to)
3412 .unwrap_or_else(|| {
3413 panic!("A process used in the graph was not instantiated: {}", to)
3414 })
3415 .clone();
3416
3417 let sink_port = D::allocate_cluster_port(&from_node);
3418 let source_port = D::allocate_process_port(&to_node);
3419
3420 (
3421 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3422 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3423 )
3424 }
3425 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3426 let from_node = clusters
3427 .get(from)
3428 .unwrap_or_else(|| {
3429 panic!("A cluster used in the graph was not instantiated: {}", from)
3430 })
3431 .clone();
3432 let to_node = clusters
3433 .get(to)
3434 .unwrap_or_else(|| {
3435 panic!("A cluster used in the graph was not instantiated: {}", to)
3436 })
3437 .clone();
3438
3439 let sink_port = D::allocate_cluster_port(&from_node);
3440 let source_port = D::allocate_cluster_port(&to_node);
3441
3442 (
3443 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3444 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3445 )
3446 }
3447 (LocationId::Tick(_, _), _) => panic!(),
3448 (_, LocationId::Tick(_, _)) => panic!(),
3449 (LocationId::Atomic(_), _) => panic!(),
3450 (_, LocationId::Atomic(_)) => panic!(),
3451 };
3452 (sink, source, connect_fn)
3453}
3454
3455#[cfg(test)]
3456mod test {
3457 use std::mem::size_of;
3458
3459 use stageleft::{QuotedWithContext, q};
3460
3461 use super::*;
3462
3463 #[test]
3464 fn hydro_node_size() {
3465 assert_eq!(size_of::<HydroNode>(), 264);
3466 }
3467
3468 #[test]
3469 fn hydro_root_size() {
3470 assert_eq!(size_of::<HydroRoot>(), 160);
3471 }
3472
3473 #[test]
3474 fn test_simplify_q_macro_basic() {
3475 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3477 let result = simplify_q_macro(simple_expr.clone());
3478 assert_eq!(result, simple_expr);
3479 }
3480
3481 #[test]
3482 fn test_simplify_q_macro_actual_stageleft_call() {
3483 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3485 let result = simplify_q_macro(stageleft_call);
3486 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3489 }
3490
3491 #[test]
3492 fn test_closure_no_pipe_at_start() {
3493 let stageleft_call = q!({
3495 let foo = 123;
3496 move |b: usize| b + foo
3497 })
3498 .splice_fn1_ctx(&());
3499 let result = simplify_q_macro(stageleft_call);
3500 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3501 }
3502}