hydro_lang/
ir.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::NetworkHint;
25#[cfg(stageleft_runtime)]
26use crate::backtrace::Backtrace;
27#[cfg(feature = "build")]
28use crate::deploy::{Deploy, RegisterPort};
29use crate::location::LocationId;
30
31/// Debug displays the type's tokens.
32///
33/// Boxes `syn::Type` which is ~240 bytes.
34#[derive(Clone, Hash)]
35pub struct DebugExpr(pub Box<syn::Expr>);
36
37impl From<syn::Expr> for DebugExpr {
38    fn from(expr: syn::Expr) -> Self {
39        Self(Box::new(expr))
40    }
41}
42
43impl Deref for DebugExpr {
44    type Target = syn::Expr;
45
46    fn deref(&self) -> &Self::Target {
47        &self.0
48    }
49}
50
51impl ToTokens for DebugExpr {
52    fn to_tokens(&self, tokens: &mut TokenStream) {
53        self.0.to_tokens(tokens);
54    }
55}
56
57impl Debug for DebugExpr {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        write!(f, "{}", self.0.to_token_stream())
60    }
61}
62
63impl Display for DebugExpr {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        let original = self.0.as_ref().clone();
66        let simplified = simplify_q_macro(original);
67
68        // For now, just use quote formatting without trying to parse as a statement
69        // This avoids the syn::parse_quote! issues entirely
70        write!(f, "q!({})", quote::quote!(#simplified))
71    }
72}
73
74/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
76    // Try to parse the token string as a syn::Expr
77    // Use a visitor to simplify q! macro expansions
78    let mut simplifier = QMacroSimplifier::new();
79    simplifier.visit_expr_mut(&mut expr);
80
81    // If we found and simplified a q! macro, return the simplified version
82    if let Some(simplified) = simplifier.simplified_result {
83        simplified
84    } else {
85        expr
86    }
87}
88
89/// AST visitor that simplifies q! macro expansions
90#[derive(Default)]
91pub struct QMacroSimplifier {
92    pub simplified_result: Option<syn::Expr>,
93}
94
95impl QMacroSimplifier {
96    pub fn new() -> Self {
97        Self::default()
98    }
99}
100
101impl VisitMut for QMacroSimplifier {
102    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
103        // Check if we already found a result to avoid further processing
104        if self.simplified_result.is_some() {
105            return;
106        }
107
108        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
109            // Look for calls to stageleft::runtime_support::fn*
110            && self.is_stageleft_runtime_support_call(&path_expr.path)
111            // Try to extract the closure from the arguments
112            && let Some(closure) = self.extract_closure_from_args(&call.args)
113        {
114            self.simplified_result = Some(closure);
115            return;
116        }
117
118        // Continue visiting child expressions using the default implementation
119        // Use the default visitor to avoid infinite recursion
120        syn::visit_mut::visit_expr_mut(self, expr);
121    }
122}
123
124impl QMacroSimplifier {
125    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
126        // Check if this is a call to stageleft::runtime_support::fn*
127        if let Some(last_segment) = path.segments.last() {
128            let fn_name = last_segment.ident.to_string();
129            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
130            fn_name.contains("_type_hint")
131                && path.segments.len() > 2
132                && path.segments[0].ident == "stageleft"
133                && path.segments[1].ident == "runtime_support"
134        } else {
135            false
136        }
137    }
138
139    fn extract_closure_from_args(
140        &self,
141        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
142    ) -> Option<syn::Expr> {
143        // Look through the arguments for a closure expression
144        for arg in args {
145            if let syn::Expr::Closure(_) = arg {
146                return Some(arg.clone());
147            }
148            // Also check for closures nested in other expressions (like blocks)
149            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
150                return Some(closure_expr);
151            }
152        }
153        None
154    }
155
156    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
157        let mut visitor = ClosureFinder {
158            found_closure: None,
159            prefer_inner_blocks: true,
160        };
161        visitor.visit_expr(expr);
162        visitor.found_closure
163    }
164}
165
166/// Visitor that finds closures in expressions with special block handling
167struct ClosureFinder {
168    found_closure: Option<syn::Expr>,
169    prefer_inner_blocks: bool,
170}
171
172impl<'ast> Visit<'ast> for ClosureFinder {
173    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
174        // If we already found a closure, don't continue searching
175        if self.found_closure.is_some() {
176            return;
177        }
178
179        match expr {
180            syn::Expr::Closure(_) => {
181                self.found_closure = Some(expr.clone());
182            }
183            syn::Expr::Block(block) if self.prefer_inner_blocks => {
184                // Special handling for blocks - look for inner blocks that contain closures
185                for stmt in &block.block.stmts {
186                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
187                        && let syn::Expr::Block(_) = stmt_expr
188                    {
189                        // Check if this nested block contains a closure
190                        let mut inner_visitor = ClosureFinder {
191                            found_closure: None,
192                            prefer_inner_blocks: false, // Avoid infinite recursion
193                        };
194                        inner_visitor.visit_expr(stmt_expr);
195                        if inner_visitor.found_closure.is_some() {
196                            // Found a closure in an inner block, return that block
197                            self.found_closure = Some(stmt_expr.clone());
198                            return;
199                        }
200                    }
201                }
202
203                // If no inner block with closure found, continue with normal visitation
204                visit::visit_expr(self, expr);
205
206                // If we found a closure, just return the closure itself, not the whole block
207                // unless we're in the special case where we want the containing block
208                if self.found_closure.is_some() {
209                    // The closure was found during visitation, no need to wrap in block
210                }
211            }
212            _ => {
213                // Use default visitor behavior for all other expressions
214                visit::visit_expr(self, expr);
215            }
216        }
217    }
218}
219
220/// Debug displays the type's tokens.
221///
222/// Boxes `syn::Type` which is ~320 bytes.
223#[derive(Clone, Hash)]
224pub struct DebugType(pub Box<syn::Type>);
225
226impl From<syn::Type> for DebugType {
227    fn from(t: syn::Type) -> Self {
228        Self(Box::new(t))
229    }
230}
231
232impl Deref for DebugType {
233    type Target = syn::Type;
234
235    fn deref(&self) -> &Self::Target {
236        &self.0
237    }
238}
239
240impl ToTokens for DebugType {
241    fn to_tokens(&self, tokens: &mut TokenStream) {
242        self.0.to_tokens(tokens);
243    }
244}
245
246impl Debug for DebugType {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        write!(f, "{}", self.0.to_token_stream())
249    }
250}
251
252pub enum DebugInstantiate {
253    Building,
254    Finalized(Box<DebugInstantiateFinalized>),
255}
256
257#[cfg_attr(
258    not(feature = "build"),
259    expect(
260        dead_code,
261        reason = "sink, source unused without `feature = \"build\"`."
262    )
263)]
264pub struct DebugInstantiateFinalized {
265    sink: syn::Expr,
266    source: syn::Expr,
267    connect_fn: Option<Box<dyn FnOnce()>>,
268}
269
270impl From<DebugInstantiateFinalized> for DebugInstantiate {
271    fn from(f: DebugInstantiateFinalized) -> Self {
272        Self::Finalized(Box::new(f))
273    }
274}
275
276impl Debug for DebugInstantiate {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        write!(f, "<network instantiate>")
279    }
280}
281
282impl Hash for DebugInstantiate {
283    fn hash<H: Hasher>(&self, _state: &mut H) {
284        // Do nothing
285    }
286}
287
288impl Clone for DebugInstantiate {
289    fn clone(&self) -> Self {
290        match self {
291            DebugInstantiate::Building => DebugInstantiate::Building,
292            DebugInstantiate::Finalized(_) => {
293                panic!("DebugInstantiate::Finalized should not be cloned")
294            }
295        }
296    }
297}
298
299/// A source in a Hydro graph, where data enters the graph.
300#[derive(Debug, Hash, Clone)]
301pub enum HydroSource {
302    Stream(DebugExpr),
303    ExternalNetwork(),
304    Iter(DebugExpr),
305    Spin(),
306}
307
308#[cfg(feature = "build")]
309pub enum BuildersOrCallback<'a, L, N>
310where
311    L: FnMut(&mut HydroLeaf, &mut usize),
312    N: FnMut(&mut HydroNode, &mut usize),
313{
314    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
315    Callback(L, N),
316}
317
318/// An leaf in a Hydro graph, which is an pipeline that doesn't emit
319/// any downstream values. Traversals over the dataflow graph and
320/// generating DFIR IR start from leaves.
321#[derive(Debug, Hash)]
322pub enum HydroLeaf {
323    ForEach {
324        f: DebugExpr,
325        input: Box<HydroNode>,
326        metadata: HydroIrMetadata,
327    },
328    SendExternal {
329        to_external_id: usize,
330        to_key: usize,
331        to_many: bool,
332        serialize_fn: Option<DebugExpr>,
333        instantiate_fn: DebugInstantiate,
334        input: Box<HydroNode>,
335    },
336    DestSink {
337        sink: DebugExpr,
338        input: Box<HydroNode>,
339        metadata: HydroIrMetadata,
340    },
341    CycleSink {
342        ident: syn::Ident,
343        input: Box<HydroNode>,
344        metadata: HydroIrMetadata,
345    },
346}
347
348impl HydroLeaf {
349    #[cfg(feature = "build")]
350    pub fn compile_network<'a, D>(
351        &mut self,
352        compile_env: &D::CompileEnv,
353        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
354        seen_tees: &mut SeenTees,
355        processes: &HashMap<usize, D::Process>,
356        clusters: &HashMap<usize, D::Cluster>,
357        externals: &HashMap<usize, D::External>,
358    ) where
359        D: Deploy<'a>,
360    {
361        self.transform_bottom_up(
362            &mut |l| {
363                if let HydroLeaf::SendExternal {
364                    input,
365                    to_external_id,
366                    to_key,
367                    to_many,
368                    instantiate_fn,
369                    ..
370                } = l
371                {
372                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
373                        DebugInstantiate::Building => {
374                            let to_node = externals
375                                .get(to_external_id)
376                                .unwrap_or_else(|| {
377                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
378                                })
379                                .clone();
380
381                            match input.metadata().location_kind.root() {
382                                LocationId::Process(process_id) => {
383                                    if *to_many {
384                                        (
385                                            (
386                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
387                                                parse_quote!(DUMMY),
388                                            ),
389                                            Box::new(|| {}) as Box<dyn FnOnce()>,
390                                        )
391                                    } else {
392                                        let from_node = processes
393                                            .get(process_id)
394                                            .unwrap_or_else(|| {
395                                                panic!("A process used in the graph was not instantiated: {}", process_id)
396                                            })
397                                            .clone();
398
399                                        let sink_port = D::allocate_process_port(&from_node);
400                                        let source_port = D::allocate_external_port(&to_node);
401
402                                        to_node.register(*to_key, source_port.clone());
403
404                                        (
405                                            (
406                                                D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
407                                                parse_quote!(DUMMY),
408                                            ),
409                                            D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
410                                        )
411                                    }
412                                }
413                                LocationId::Cluster(_) => todo!(),
414                                _ => panic!()
415                            }
416                        },
417
418                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
419                    };
420
421                    *instantiate_fn = DebugInstantiateFinalized {
422                        sink: sink_expr,
423                        source: source_expr,
424                        connect_fn: Some(connect_fn),
425                    }
426                    .into();
427                }
428            },
429            &mut |n| {
430                if let HydroNode::Network {
431                    input,
432                    instantiate_fn,
433                    metadata,
434                    ..
435                } = n
436                {
437                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
438                        DebugInstantiate::Building => instantiate_network::<D>(
439                            input.metadata().location_kind.root(),
440                            metadata.location_kind.root(),
441                            processes,
442                            clusters,
443                            compile_env,
444                        ),
445
446                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
447                    };
448
449                    *instantiate_fn = DebugInstantiateFinalized {
450                        sink: sink_expr,
451                        source: source_expr,
452                        connect_fn: Some(connect_fn),
453                    }
454                    .into();
455                } else if let HydroNode::ExternalInput {
456                    from_external_id,
457                    from_key,
458                    from_many,
459                    codec_type,
460                    port_hint,
461                    instantiate_fn,
462                    metadata,
463                    ..
464                } = n
465                {
466                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
467                        DebugInstantiate::Building => {
468                            let from_node = externals
469                                .get(from_external_id)
470                                .unwrap_or_else(|| {
471                                    panic!(
472                                        "A external used in the graph was not instantiated: {}",
473                                        from_external_id
474                                    )
475                                })
476                                .clone();
477
478                            match metadata.location_kind.root() {
479                                LocationId::Process(process_id) => {
480                                    let to_node = processes
481                                        .get(process_id)
482                                        .unwrap_or_else(|| {
483                                            panic!("A process used in the graph was not instantiated: {}", process_id)
484                                        })
485                                        .clone();
486
487                                    let sink_port = D::allocate_external_port(&from_node);
488                                    let source_port = D::allocate_process_port(&to_node);
489
490                                    from_node.register(*from_key, sink_port.clone());
491
492                                    (
493                                        (
494                                            parse_quote!(DUMMY),
495                                            if *from_many {
496                                                D::e2o_many_source(
497                                                    compile_env,
498                                                    extra_stmts.entry(*process_id).or_default(),
499                                                    &to_node, &source_port,
500                                                    codec_type.0.as_ref(),
501                                                    format!("{}_{}", *from_external_id, *from_key)
502                                                )
503                                            } else {
504                                                D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
505                                            },
506                                        ),
507                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
508                                    )
509                                }
510                                LocationId::Cluster(_) => todo!(),
511                                _ => panic!()
512                            }
513                        },
514
515                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
516                    };
517
518                    *instantiate_fn = DebugInstantiateFinalized {
519                        sink: sink_expr,
520                        source: source_expr,
521                        connect_fn: Some(connect_fn),
522                    }
523                    .into();
524                }
525            },
526            seen_tees,
527            false,
528        );
529    }
530
531    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
532        self.transform_bottom_up(
533            &mut |l| {
534                if let HydroLeaf::SendExternal { instantiate_fn, .. } = l {
535                    match instantiate_fn {
536                        DebugInstantiate::Building => panic!("network not built"),
537
538                        DebugInstantiate::Finalized(finalized) => {
539                            (finalized.connect_fn.take().unwrap())();
540                        }
541                    }
542                }
543            },
544            &mut |n| {
545                if let HydroNode::Network { instantiate_fn, .. }
546                | HydroNode::ExternalInput { instantiate_fn, .. } = n
547                {
548                    match instantiate_fn {
549                        DebugInstantiate::Building => panic!("network not built"),
550
551                        DebugInstantiate::Finalized(finalized) => {
552                            (finalized.connect_fn.take().unwrap())();
553                        }
554                    }
555                }
556            },
557            seen_tees,
558            false,
559        );
560    }
561
562    pub fn transform_bottom_up(
563        &mut self,
564        transform_leaf: &mut impl FnMut(&mut HydroLeaf),
565        transform_node: &mut impl FnMut(&mut HydroNode),
566        seen_tees: &mut SeenTees,
567        check_well_formed: bool,
568    ) {
569        self.transform_children(
570            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
571            seen_tees,
572        );
573
574        transform_leaf(self);
575    }
576
577    pub fn transform_children(
578        &mut self,
579        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
580        seen_tees: &mut SeenTees,
581    ) {
582        match self {
583            HydroLeaf::ForEach { input, .. }
584            | HydroLeaf::SendExternal { input, .. }
585            | HydroLeaf::DestSink { input, .. }
586            | HydroLeaf::CycleSink { input, .. } => {
587                transform(input, seen_tees);
588            }
589        }
590    }
591
592    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
593        match self {
594            HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
595                f: f.clone(),
596                input: Box::new(input.deep_clone(seen_tees)),
597                metadata: metadata.clone(),
598            },
599            HydroLeaf::SendExternal {
600                to_external_id,
601                to_key,
602                to_many,
603                serialize_fn,
604                instantiate_fn,
605                input,
606            } => HydroLeaf::SendExternal {
607                to_external_id: *to_external_id,
608                to_key: *to_key,
609                to_many: *to_many,
610                serialize_fn: serialize_fn.clone(),
611                instantiate_fn: instantiate_fn.clone(),
612                input: Box::new(input.deep_clone(seen_tees)),
613            },
614            HydroLeaf::DestSink {
615                sink,
616                input,
617                metadata,
618            } => HydroLeaf::DestSink {
619                sink: sink.clone(),
620                input: Box::new(input.deep_clone(seen_tees)),
621                metadata: metadata.clone(),
622            },
623            HydroLeaf::CycleSink {
624                ident,
625                input,
626                metadata,
627            } => HydroLeaf::CycleSink {
628                ident: ident.clone(),
629                input: Box::new(input.deep_clone(seen_tees)),
630                metadata: metadata.clone(),
631            },
632        }
633    }
634
635    #[cfg(feature = "build")]
636    pub fn emit(
637        &mut self,
638        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
639        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
640        next_stmt_id: &mut usize,
641    ) {
642        self.emit_core(
643            &mut BuildersOrCallback::Builders::<
644                fn(&mut HydroLeaf, &mut usize),
645                fn(&mut HydroNode, &mut usize),
646            >(graph_builders),
647            built_tees,
648            next_stmt_id,
649        );
650    }
651
652    #[cfg(feature = "build")]
653    pub fn emit_core(
654        &mut self,
655        builders_or_callback: &mut BuildersOrCallback<
656            impl FnMut(&mut HydroLeaf, &mut usize),
657            impl FnMut(&mut HydroNode, &mut usize),
658        >,
659        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
660        next_stmt_id: &mut usize,
661    ) {
662        match self {
663            HydroLeaf::ForEach { f, input, .. } => {
664                let (input_ident, input_location_id) =
665                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
666
667                match builders_or_callback {
668                    BuildersOrCallback::Builders(graph_builders) => {
669                        graph_builders
670                            .entry(input_location_id)
671                            .or_default()
672                            .add_dfir(
673                                parse_quote! {
674                                    #input_ident -> for_each(#f);
675                                },
676                                None,
677                                Some(&next_stmt_id.to_string()),
678                            );
679                    }
680                    BuildersOrCallback::Callback(leaf_callback, _) => {
681                        leaf_callback(self, next_stmt_id);
682                    }
683                }
684
685                *next_stmt_id += 1;
686            }
687
688            HydroLeaf::SendExternal {
689                serialize_fn,
690                instantiate_fn,
691                input,
692                ..
693            } => {
694                let (input_ident, input_location_id) =
695                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
696
697                match builders_or_callback {
698                    BuildersOrCallback::Builders(graph_builders) => {
699                        let (sink_expr, _) = match instantiate_fn {
700                            DebugInstantiate::Building => (
701                                syn::parse_quote!(DUMMY_SINK),
702                                syn::parse_quote!(DUMMY_SOURCE),
703                            ),
704
705                            DebugInstantiate::Finalized(finalized) => {
706                                (finalized.sink.clone(), finalized.source.clone())
707                            }
708                        };
709
710                        let sender_builder = graph_builders.entry(input_location_id).or_default();
711                        if let Some(serialize_fn) = serialize_fn {
712                            sender_builder.add_dfir(
713                                parse_quote! {
714                                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
715                                },
716                                None,
717                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
718                                Some(&format!("send{}", next_stmt_id)),
719                            );
720                        } else {
721                            sender_builder.add_dfir(
722                                parse_quote! {
723                                    #input_ident -> dest_sink(#sink_expr);
724                                },
725                                None,
726                                Some(&format!("send{}", next_stmt_id)),
727                            );
728                        }
729                    }
730                    BuildersOrCallback::Callback(leaf_callback, _) => {
731                        leaf_callback(self, next_stmt_id);
732                    }
733                }
734
735                *next_stmt_id += 1;
736            }
737
738            HydroLeaf::DestSink { sink, input, .. } => {
739                let (input_ident, input_location_id) =
740                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
741
742                match builders_or_callback {
743                    BuildersOrCallback::Builders(graph_builders) => {
744                        graph_builders
745                            .entry(input_location_id)
746                            .or_default()
747                            .add_dfir(
748                                parse_quote! {
749                                    #input_ident -> dest_sink(#sink);
750                                },
751                                None,
752                                Some(&next_stmt_id.to_string()),
753                            );
754                    }
755                    BuildersOrCallback::Callback(leaf_callback, _) => {
756                        leaf_callback(self, next_stmt_id);
757                    }
758                }
759
760                *next_stmt_id += 1;
761            }
762
763            HydroLeaf::CycleSink {
764                ident,
765                input,
766                metadata,
767                ..
768            } => {
769                let (input_ident, input_location_id) =
770                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
771
772                let location_id = metadata.location_kind.root().raw_id();
773
774                match builders_or_callback {
775                    BuildersOrCallback::Builders(graph_builders) => {
776                        assert_eq!(
777                            input_location_id, location_id,
778                            "cycle_sink location mismatch"
779                        );
780
781                        graph_builders.entry(location_id).or_default().add_dfir(
782                            parse_quote! {
783                                #ident = #input_ident;
784                            },
785                            None,
786                            None,
787                        );
788                    }
789                    // No ID, no callback
790                    BuildersOrCallback::Callback(_, _) => {}
791                }
792            }
793        }
794    }
795
796    pub fn metadata(&self) -> &HydroIrMetadata {
797        match self {
798            HydroLeaf::ForEach { metadata, .. }
799            | HydroLeaf::DestSink { metadata, .. }
800            | HydroLeaf::CycleSink { metadata, .. } => metadata,
801            HydroLeaf::SendExternal { .. } => panic!(),
802        }
803    }
804
805    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
806        match self {
807            HydroLeaf::ForEach { metadata, .. }
808            | HydroLeaf::DestSink { metadata, .. }
809            | HydroLeaf::CycleSink { metadata, .. } => metadata,
810            HydroLeaf::SendExternal { .. } => panic!(),
811        }
812    }
813
814    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
815        match self {
816            HydroLeaf::ForEach { input, .. }
817            | HydroLeaf::SendExternal { input, .. }
818            | HydroLeaf::DestSink { input, .. }
819            | HydroLeaf::CycleSink { input, .. } => {
820                vec![input.metadata()]
821            }
822        }
823    }
824
825    pub fn print_root(&self) -> String {
826        match self {
827            HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
828            HydroLeaf::SendExternal { .. } => "SendExternal".to_string(),
829            HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
830            HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
831        }
832    }
833
834    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
835        match self {
836            HydroLeaf::ForEach { f, .. } | HydroLeaf::DestSink { sink: f, .. } => {
837                transform(f);
838            }
839            HydroLeaf::SendExternal { .. } | HydroLeaf::CycleSink { .. } => {}
840        }
841    }
842}
843
844#[cfg(feature = "build")]
845pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
846    let mut builders = BTreeMap::new();
847    let mut built_tees = HashMap::new();
848    let mut next_stmt_id = 0;
849    for leaf in ir {
850        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
851    }
852    builders
853}
854
855#[cfg(feature = "build")]
856pub fn traverse_dfir(
857    ir: &mut [HydroLeaf],
858    transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
859    transform_node: impl FnMut(&mut HydroNode, &mut usize),
860) {
861    let mut seen_tees = HashMap::new();
862    let mut next_stmt_id = 0;
863    let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
864    ir.iter_mut().for_each(|leaf| {
865        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
866    });
867}
868
869pub fn transform_bottom_up(
870    ir: &mut [HydroLeaf],
871    transform_leaf: &mut impl FnMut(&mut HydroLeaf),
872    transform_node: &mut impl FnMut(&mut HydroNode),
873    check_well_formed: bool,
874) {
875    let mut seen_tees = HashMap::new();
876    ir.iter_mut().for_each(|leaf| {
877        leaf.transform_bottom_up(
878            transform_leaf,
879            transform_node,
880            &mut seen_tees,
881            check_well_formed,
882        );
883    });
884}
885
886pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
887    let mut seen_tees = HashMap::new();
888    ir.iter()
889        .map(|leaf| leaf.deep_clone(&mut seen_tees))
890        .collect()
891}
892
893type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
894thread_local! {
895    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
896}
897
898pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
899    PRINTED_TEES.with(|printed_tees| {
900        let mut printed_tees_mut = printed_tees.borrow_mut();
901        *printed_tees_mut = Some((0, HashMap::new()));
902        drop(printed_tees_mut);
903
904        let ret = f();
905
906        let mut printed_tees_mut = printed_tees.borrow_mut();
907        *printed_tees_mut = None;
908
909        ret
910    })
911}
912
913pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
914
915impl TeeNode {
916    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
917        Rc::as_ptr(&self.0)
918    }
919}
920
921impl Debug for TeeNode {
922    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
923        PRINTED_TEES.with(|printed_tees| {
924            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
925            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
926
927            if let Some(printed_tees_mut) = printed_tees_mut {
928                if let Some(existing) = printed_tees_mut
929                    .1
930                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
931                {
932                    write!(f, "<tee {}>", existing)
933                } else {
934                    let next_id = printed_tees_mut.0;
935                    printed_tees_mut.0 += 1;
936                    printed_tees_mut
937                        .1
938                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
939                    drop(printed_tees_mut_borrow);
940                    write!(f, "<tee {}>: ", next_id)?;
941                    Debug::fmt(&self.0.borrow(), f)
942                }
943            } else {
944                drop(printed_tees_mut_borrow);
945                write!(f, "<tee>: ")?;
946                Debug::fmt(&self.0.borrow(), f)
947            }
948        })
949    }
950}
951
952impl Hash for TeeNode {
953    fn hash<H: Hasher>(&self, state: &mut H) {
954        self.0.borrow_mut().hash(state);
955    }
956}
957
958#[derive(Clone)]
959pub struct HydroIrMetadata {
960    pub location_kind: LocationId,
961    pub backtrace: Backtrace,
962    pub output_type: Option<DebugType>,
963    pub cardinality: Option<usize>,
964    pub cpu_usage: Option<f64>,
965    pub network_recv_cpu_usage: Option<f64>,
966    pub id: Option<usize>,
967    pub tag: Option<String>,
968}
969
970// HydroIrMetadata shouldn't be used to hash or compare
971impl Hash for HydroIrMetadata {
972    fn hash<H: Hasher>(&self, _: &mut H) {}
973}
974
975impl PartialEq for HydroIrMetadata {
976    fn eq(&self, _: &Self) -> bool {
977        true
978    }
979}
980
981impl Eq for HydroIrMetadata {}
982
983impl Debug for HydroIrMetadata {
984    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985        f.debug_struct("HydroIrMetadata")
986            .field("location_kind", &self.location_kind)
987            .field("output_type", &self.output_type)
988            .finish()
989    }
990}
991
992/// An intermediate node in a Hydro graph, which consumes data
993/// from upstream nodes and emits data to downstream nodes.
994#[derive(Debug, Hash)]
995pub enum HydroNode {
996    Placeholder,
997
998    Source {
999        source: HydroSource,
1000        metadata: HydroIrMetadata,
1001    },
1002
1003    CycleSource {
1004        ident: syn::Ident,
1005        metadata: HydroIrMetadata,
1006    },
1007
1008    Tee {
1009        inner: TeeNode,
1010        metadata: HydroIrMetadata,
1011    },
1012
1013    Persist {
1014        inner: Box<HydroNode>,
1015        metadata: HydroIrMetadata,
1016    },
1017
1018    Unpersist {
1019        inner: Box<HydroNode>,
1020        metadata: HydroIrMetadata,
1021    },
1022
1023    Delta {
1024        inner: Box<HydroNode>,
1025        metadata: HydroIrMetadata,
1026    },
1027
1028    Chain {
1029        first: Box<HydroNode>,
1030        second: Box<HydroNode>,
1031        metadata: HydroIrMetadata,
1032    },
1033
1034    CrossProduct {
1035        left: Box<HydroNode>,
1036        right: Box<HydroNode>,
1037        metadata: HydroIrMetadata,
1038    },
1039
1040    CrossSingleton {
1041        left: Box<HydroNode>,
1042        right: Box<HydroNode>,
1043        metadata: HydroIrMetadata,
1044    },
1045
1046    Join {
1047        left: Box<HydroNode>,
1048        right: Box<HydroNode>,
1049        metadata: HydroIrMetadata,
1050    },
1051
1052    Difference {
1053        pos: Box<HydroNode>,
1054        neg: Box<HydroNode>,
1055        metadata: HydroIrMetadata,
1056    },
1057
1058    AntiJoin {
1059        pos: Box<HydroNode>,
1060        neg: Box<HydroNode>,
1061        metadata: HydroIrMetadata,
1062    },
1063
1064    ResolveFutures {
1065        input: Box<HydroNode>,
1066        metadata: HydroIrMetadata,
1067    },
1068    ResolveFuturesOrdered {
1069        input: Box<HydroNode>,
1070        metadata: HydroIrMetadata,
1071    },
1072
1073    Map {
1074        f: DebugExpr,
1075        input: Box<HydroNode>,
1076        metadata: HydroIrMetadata,
1077    },
1078    FlatMap {
1079        f: DebugExpr,
1080        input: Box<HydroNode>,
1081        metadata: HydroIrMetadata,
1082    },
1083    Filter {
1084        f: DebugExpr,
1085        input: Box<HydroNode>,
1086        metadata: HydroIrMetadata,
1087    },
1088    FilterMap {
1089        f: DebugExpr,
1090        input: Box<HydroNode>,
1091        metadata: HydroIrMetadata,
1092    },
1093
1094    DeferTick {
1095        input: Box<HydroNode>,
1096        metadata: HydroIrMetadata,
1097    },
1098    Enumerate {
1099        is_static: bool,
1100        input: Box<HydroNode>,
1101        metadata: HydroIrMetadata,
1102    },
1103    Inspect {
1104        f: DebugExpr,
1105        input: Box<HydroNode>,
1106        metadata: HydroIrMetadata,
1107    },
1108
1109    Unique {
1110        input: Box<HydroNode>,
1111        metadata: HydroIrMetadata,
1112    },
1113
1114    Sort {
1115        input: Box<HydroNode>,
1116        metadata: HydroIrMetadata,
1117    },
1118    Fold {
1119        init: DebugExpr,
1120        acc: DebugExpr,
1121        input: Box<HydroNode>,
1122        metadata: HydroIrMetadata,
1123    },
1124
1125    Scan {
1126        init: DebugExpr,
1127        acc: DebugExpr,
1128        input: Box<HydroNode>,
1129        metadata: HydroIrMetadata,
1130    },
1131    FoldKeyed {
1132        init: DebugExpr,
1133        acc: DebugExpr,
1134        input: Box<HydroNode>,
1135        metadata: HydroIrMetadata,
1136    },
1137
1138    Reduce {
1139        f: DebugExpr,
1140        input: Box<HydroNode>,
1141        metadata: HydroIrMetadata,
1142    },
1143    ReduceKeyed {
1144        f: DebugExpr,
1145        input: Box<HydroNode>,
1146        metadata: HydroIrMetadata,
1147    },
1148    ReduceKeyedWatermark {
1149        f: DebugExpr,
1150        input: Box<HydroNode>,
1151        watermark: Box<HydroNode>,
1152        metadata: HydroIrMetadata,
1153    },
1154
1155    Network {
1156        serialize_fn: Option<DebugExpr>,
1157        instantiate_fn: DebugInstantiate,
1158        deserialize_fn: Option<DebugExpr>,
1159        input: Box<HydroNode>,
1160        metadata: HydroIrMetadata,
1161    },
1162
1163    ExternalInput {
1164        from_external_id: usize,
1165        from_key: usize,
1166        from_many: bool,
1167        codec_type: DebugType,
1168        port_hint: NetworkHint,
1169        instantiate_fn: DebugInstantiate,
1170        deserialize_fn: Option<DebugExpr>,
1171        metadata: HydroIrMetadata,
1172    },
1173
1174    Counter {
1175        tag: String,
1176        duration: DebugExpr,
1177        input: Box<HydroNode>,
1178        metadata: HydroIrMetadata,
1179    },
1180}
1181
1182pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1183pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1184
1185impl HydroNode {
1186    pub fn transform_bottom_up(
1187        &mut self,
1188        transform: &mut impl FnMut(&mut HydroNode),
1189        seen_tees: &mut SeenTees,
1190        check_well_formed: bool,
1191    ) {
1192        self.transform_children(
1193            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1194            seen_tees,
1195        );
1196
1197        transform(self);
1198
1199        let self_location = self.metadata().location_kind.root();
1200
1201        if check_well_formed {
1202            match &*self {
1203                HydroNode::Network { .. } => {}
1204                _ => {
1205                    self.input_metadata().iter().for_each(|i| {
1206                        if i.location_kind.root() != self_location {
1207                            panic!(
1208                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1209                                i,
1210                                i.location_kind.root(),
1211                                self,
1212                                self_location
1213                            )
1214                        }
1215                    });
1216                }
1217            }
1218        }
1219    }
1220
1221    #[inline(always)]
1222    pub fn transform_children(
1223        &mut self,
1224        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1225        seen_tees: &mut SeenTees,
1226    ) {
1227        match self {
1228            HydroNode::Placeholder => {
1229                panic!();
1230            }
1231
1232            HydroNode::Source { .. }
1233            | HydroNode::CycleSource { .. }
1234            | HydroNode::ExternalInput { .. } => {}
1235
1236            HydroNode::Tee { inner, .. } => {
1237                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1238                    *inner = TeeNode(transformed.clone());
1239                } else {
1240                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1241                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1242                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1243                    transform(&mut orig, seen_tees);
1244                    *transformed_cell.borrow_mut() = orig;
1245                    *inner = TeeNode(transformed_cell);
1246                }
1247            }
1248
1249            HydroNode::Persist { inner, .. }
1250            | HydroNode::Unpersist { inner, .. }
1251            | HydroNode::Delta { inner, .. } => {
1252                transform(inner.as_mut(), seen_tees);
1253            }
1254
1255            HydroNode::Chain { first, second, .. } => {
1256                transform(first.as_mut(), seen_tees);
1257                transform(second.as_mut(), seen_tees);
1258            }
1259
1260            HydroNode::CrossSingleton { left, right, .. }
1261            | HydroNode::CrossProduct { left, right, .. }
1262            | HydroNode::Join { left, right, .. } => {
1263                transform(left.as_mut(), seen_tees);
1264                transform(right.as_mut(), seen_tees);
1265            }
1266
1267            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1268                transform(pos.as_mut(), seen_tees);
1269                transform(neg.as_mut(), seen_tees);
1270            }
1271
1272            HydroNode::ReduceKeyedWatermark {
1273                input, watermark, ..
1274            } => {
1275                transform(input.as_mut(), seen_tees);
1276                transform(watermark.as_mut(), seen_tees);
1277            }
1278
1279            HydroNode::Map { input, .. }
1280            | HydroNode::ResolveFutures { input, .. }
1281            | HydroNode::ResolveFuturesOrdered { input, .. }
1282            | HydroNode::FlatMap { input, .. }
1283            | HydroNode::Filter { input, .. }
1284            | HydroNode::FilterMap { input, .. }
1285            | HydroNode::Sort { input, .. }
1286            | HydroNode::DeferTick { input, .. }
1287            | HydroNode::Enumerate { input, .. }
1288            | HydroNode::Inspect { input, .. }
1289            | HydroNode::Unique { input, .. }
1290            | HydroNode::Network { input, .. }
1291            | HydroNode::Fold { input, .. }
1292            | HydroNode::Scan { input, .. }
1293            | HydroNode::FoldKeyed { input, .. }
1294            | HydroNode::Reduce { input, .. }
1295            | HydroNode::ReduceKeyed { input, .. }
1296            | HydroNode::Counter { input, .. } => {
1297                transform(input.as_mut(), seen_tees);
1298            }
1299        }
1300    }
1301
1302    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1303        match self {
1304            HydroNode::Placeholder => HydroNode::Placeholder,
1305            HydroNode::Source { source, metadata } => HydroNode::Source {
1306                source: source.clone(),
1307                metadata: metadata.clone(),
1308            },
1309            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1310                ident: ident.clone(),
1311                metadata: metadata.clone(),
1312            },
1313            HydroNode::Tee { inner, metadata } => {
1314                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1315                    HydroNode::Tee {
1316                        inner: TeeNode(transformed.clone()),
1317                        metadata: metadata.clone(),
1318                    }
1319                } else {
1320                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1321                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1322                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1323                    *new_rc.borrow_mut() = cloned;
1324                    HydroNode::Tee {
1325                        inner: TeeNode(new_rc),
1326                        metadata: metadata.clone(),
1327                    }
1328                }
1329            }
1330            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1331                inner: Box::new(inner.deep_clone(seen_tees)),
1332                metadata: metadata.clone(),
1333            },
1334            HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
1335                inner: Box::new(inner.deep_clone(seen_tees)),
1336                metadata: metadata.clone(),
1337            },
1338            HydroNode::Delta { inner, metadata } => HydroNode::Delta {
1339                inner: Box::new(inner.deep_clone(seen_tees)),
1340                metadata: metadata.clone(),
1341            },
1342            HydroNode::Chain {
1343                first,
1344                second,
1345                metadata,
1346            } => HydroNode::Chain {
1347                first: Box::new(first.deep_clone(seen_tees)),
1348                second: Box::new(second.deep_clone(seen_tees)),
1349                metadata: metadata.clone(),
1350            },
1351            HydroNode::CrossProduct {
1352                left,
1353                right,
1354                metadata,
1355            } => HydroNode::CrossProduct {
1356                left: Box::new(left.deep_clone(seen_tees)),
1357                right: Box::new(right.deep_clone(seen_tees)),
1358                metadata: metadata.clone(),
1359            },
1360            HydroNode::CrossSingleton {
1361                left,
1362                right,
1363                metadata,
1364            } => HydroNode::CrossSingleton {
1365                left: Box::new(left.deep_clone(seen_tees)),
1366                right: Box::new(right.deep_clone(seen_tees)),
1367                metadata: metadata.clone(),
1368            },
1369            HydroNode::Join {
1370                left,
1371                right,
1372                metadata,
1373            } => HydroNode::Join {
1374                left: Box::new(left.deep_clone(seen_tees)),
1375                right: Box::new(right.deep_clone(seen_tees)),
1376                metadata: metadata.clone(),
1377            },
1378            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1379                pos: Box::new(pos.deep_clone(seen_tees)),
1380                neg: Box::new(neg.deep_clone(seen_tees)),
1381                metadata: metadata.clone(),
1382            },
1383            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1384                pos: Box::new(pos.deep_clone(seen_tees)),
1385                neg: Box::new(neg.deep_clone(seen_tees)),
1386                metadata: metadata.clone(),
1387            },
1388            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1389                input: Box::new(input.deep_clone(seen_tees)),
1390                metadata: metadata.clone(),
1391            },
1392            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1393                HydroNode::ResolveFuturesOrdered {
1394                    input: Box::new(input.deep_clone(seen_tees)),
1395                    metadata: metadata.clone(),
1396                }
1397            }
1398            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1399                f: f.clone(),
1400                input: Box::new(input.deep_clone(seen_tees)),
1401                metadata: metadata.clone(),
1402            },
1403            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1404                f: f.clone(),
1405                input: Box::new(input.deep_clone(seen_tees)),
1406                metadata: metadata.clone(),
1407            },
1408            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1409                f: f.clone(),
1410                input: Box::new(input.deep_clone(seen_tees)),
1411                metadata: metadata.clone(),
1412            },
1413            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1414                f: f.clone(),
1415                input: Box::new(input.deep_clone(seen_tees)),
1416                metadata: metadata.clone(),
1417            },
1418            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1419                input: Box::new(input.deep_clone(seen_tees)),
1420                metadata: metadata.clone(),
1421            },
1422            HydroNode::Enumerate {
1423                is_static,
1424                input,
1425                metadata,
1426            } => HydroNode::Enumerate {
1427                is_static: *is_static,
1428                input: Box::new(input.deep_clone(seen_tees)),
1429                metadata: metadata.clone(),
1430            },
1431            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1432                f: f.clone(),
1433                input: Box::new(input.deep_clone(seen_tees)),
1434                metadata: metadata.clone(),
1435            },
1436            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1437                input: Box::new(input.deep_clone(seen_tees)),
1438                metadata: metadata.clone(),
1439            },
1440            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1441                input: Box::new(input.deep_clone(seen_tees)),
1442                metadata: metadata.clone(),
1443            },
1444            HydroNode::Fold {
1445                init,
1446                acc,
1447                input,
1448                metadata,
1449            } => HydroNode::Fold {
1450                init: init.clone(),
1451                acc: acc.clone(),
1452                input: Box::new(input.deep_clone(seen_tees)),
1453                metadata: metadata.clone(),
1454            },
1455            HydroNode::Scan {
1456                init,
1457                acc,
1458                input,
1459                metadata,
1460            } => HydroNode::Scan {
1461                init: init.clone(),
1462                acc: acc.clone(),
1463                input: Box::new(input.deep_clone(seen_tees)),
1464                metadata: metadata.clone(),
1465            },
1466            HydroNode::FoldKeyed {
1467                init,
1468                acc,
1469                input,
1470                metadata,
1471            } => HydroNode::FoldKeyed {
1472                init: init.clone(),
1473                acc: acc.clone(),
1474                input: Box::new(input.deep_clone(seen_tees)),
1475                metadata: metadata.clone(),
1476            },
1477            HydroNode::ReduceKeyedWatermark {
1478                f,
1479                input,
1480                watermark,
1481                metadata,
1482            } => HydroNode::ReduceKeyedWatermark {
1483                f: f.clone(),
1484                input: Box::new(input.deep_clone(seen_tees)),
1485                watermark: Box::new(watermark.deep_clone(seen_tees)),
1486                metadata: metadata.clone(),
1487            },
1488            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1489                f: f.clone(),
1490                input: Box::new(input.deep_clone(seen_tees)),
1491                metadata: metadata.clone(),
1492            },
1493            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1494                f: f.clone(),
1495                input: Box::new(input.deep_clone(seen_tees)),
1496                metadata: metadata.clone(),
1497            },
1498            HydroNode::Network {
1499                serialize_fn,
1500                instantiate_fn,
1501                deserialize_fn,
1502                input,
1503                metadata,
1504            } => HydroNode::Network {
1505                serialize_fn: serialize_fn.clone(),
1506                instantiate_fn: instantiate_fn.clone(),
1507                deserialize_fn: deserialize_fn.clone(),
1508                input: Box::new(input.deep_clone(seen_tees)),
1509                metadata: metadata.clone(),
1510            },
1511            HydroNode::ExternalInput {
1512                from_external_id,
1513                from_key,
1514                from_many,
1515                codec_type,
1516                port_hint,
1517                instantiate_fn,
1518                deserialize_fn,
1519                metadata,
1520            } => HydroNode::ExternalInput {
1521                from_external_id: *from_external_id,
1522                from_key: *from_key,
1523                from_many: *from_many,
1524                codec_type: codec_type.clone(),
1525                port_hint: *port_hint,
1526                instantiate_fn: instantiate_fn.clone(),
1527                deserialize_fn: deserialize_fn.clone(),
1528                metadata: metadata.clone(),
1529            },
1530            HydroNode::Counter {
1531                tag,
1532                duration,
1533                input,
1534                metadata,
1535            } => HydroNode::Counter {
1536                tag: tag.clone(),
1537                duration: duration.clone(),
1538                input: Box::new(input.deep_clone(seen_tees)),
1539                metadata: metadata.clone(),
1540            },
1541        }
1542    }
1543
1544    #[cfg(feature = "build")]
1545    pub fn emit_core(
1546        &mut self,
1547        builders_or_callback: &mut BuildersOrCallback<
1548            impl FnMut(&mut HydroLeaf, &mut usize),
1549            impl FnMut(&mut HydroNode, &mut usize),
1550        >,
1551        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1552        next_stmt_id: &mut usize,
1553    ) -> (syn::Ident, usize) {
1554        match self {
1555            HydroNode::Placeholder => {
1556                panic!()
1557            }
1558
1559            HydroNode::Persist { inner, .. } => {
1560                let (inner_ident, location) =
1561                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1562
1563                let persist_ident =
1564                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1565
1566                match builders_or_callback {
1567                    BuildersOrCallback::Builders(graph_builders) => {
1568                        let builder = graph_builders.entry(location).or_default();
1569                        builder.add_dfir(
1570                            parse_quote! {
1571                                #persist_ident = #inner_ident -> persist::<'static>();
1572                            },
1573                            None,
1574                            Some(&next_stmt_id.to_string()),
1575                        );
1576                    }
1577                    BuildersOrCallback::Callback(_, node_callback) => {
1578                        node_callback(self, next_stmt_id);
1579                    }
1580                }
1581
1582                *next_stmt_id += 1;
1583
1584                (persist_ident, location)
1585            }
1586
1587            HydroNode::Unpersist { .. } => {
1588                panic!(
1589                    "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1590                )
1591            }
1592
1593            HydroNode::Delta { inner, .. } => {
1594                let (inner_ident, location) =
1595                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1596
1597                let delta_ident =
1598                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1599
1600                match builders_or_callback {
1601                    BuildersOrCallback::Builders(graph_builders) => {
1602                        let builder = graph_builders.entry(location).or_default();
1603                        builder.add_dfir(
1604                            parse_quote! {
1605                                #delta_ident = #inner_ident -> multiset_delta();
1606                            },
1607                            None,
1608                            Some(&next_stmt_id.to_string()),
1609                        );
1610                    }
1611                    BuildersOrCallback::Callback(_, node_callback) => {
1612                        node_callback(self, next_stmt_id);
1613                    }
1614                }
1615
1616                *next_stmt_id += 1;
1617
1618                (delta_ident, location)
1619            }
1620
1621            HydroNode::Source {
1622                source, metadata, ..
1623            } => {
1624                let location_id = metadata.location_kind.root().raw_id();
1625
1626                if let HydroSource::ExternalNetwork() = source {
1627                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1628                } else {
1629                    let source_ident =
1630                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1631
1632                    let source_stmt = match source {
1633                        HydroSource::Stream(expr) => {
1634                            parse_quote! {
1635                                #source_ident = source_stream(#expr);
1636                            }
1637                        }
1638
1639                        HydroSource::ExternalNetwork() => {
1640                            unreachable!()
1641                        }
1642
1643                        HydroSource::Iter(expr) => {
1644                            parse_quote! {
1645                                #source_ident = source_iter(#expr);
1646                            }
1647                        }
1648
1649                        HydroSource::Spin() => {
1650                            parse_quote! {
1651                                #source_ident = spin();
1652                            }
1653                        }
1654                    };
1655
1656                    match builders_or_callback {
1657                        BuildersOrCallback::Builders(graph_builders) => {
1658                            let builder = graph_builders.entry(location_id).or_default();
1659                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1660                        }
1661                        BuildersOrCallback::Callback(_, node_callback) => {
1662                            node_callback(self, next_stmt_id);
1663                        }
1664                    }
1665
1666                    *next_stmt_id += 1;
1667
1668                    (source_ident, location_id)
1669                }
1670            }
1671
1672            HydroNode::CycleSource {
1673                ident, metadata, ..
1674            } => {
1675                let location_id = metadata.location_kind.root().raw_id();
1676
1677                let ident = ident.clone();
1678
1679                match builders_or_callback {
1680                    BuildersOrCallback::Builders(_) => {}
1681                    BuildersOrCallback::Callback(_, node_callback) => {
1682                        node_callback(self, next_stmt_id);
1683                    }
1684                }
1685
1686                // consume a stmt id even though we did not emit anything so that we can instrument this
1687                *next_stmt_id += 1;
1688
1689                (ident, location_id)
1690            }
1691
1692            HydroNode::Tee { inner, .. } => {
1693                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1694                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1695                {
1696                    match builders_or_callback {
1697                        BuildersOrCallback::Builders(_) => {}
1698                        BuildersOrCallback::Callback(_, node_callback) => {
1699                            node_callback(self, next_stmt_id);
1700                        }
1701                    }
1702
1703                    (teed_from.clone(), *inner_location_id)
1704                } else {
1705                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1706                        builders_or_callback,
1707                        built_tees,
1708                        next_stmt_id,
1709                    );
1710
1711                    let tee_ident =
1712                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1713
1714                    built_tees.insert(
1715                        inner.0.as_ref() as *const RefCell<HydroNode>,
1716                        (tee_ident.clone(), inner_location_id),
1717                    );
1718
1719                    match builders_or_callback {
1720                        BuildersOrCallback::Builders(graph_builders) => {
1721                            let builder = graph_builders.entry(inner_location_id).or_default();
1722                            builder.add_dfir(
1723                                parse_quote! {
1724                                    #tee_ident = #inner_ident -> tee();
1725                                },
1726                                None,
1727                                Some(&next_stmt_id.to_string()),
1728                            );
1729                        }
1730                        BuildersOrCallback::Callback(_, node_callback) => {
1731                            node_callback(self, next_stmt_id);
1732                        }
1733                    }
1734
1735                    (tee_ident, inner_location_id)
1736                };
1737
1738                // we consume a stmt id regardless of if we emit the tee() operator,
1739                // so that during rewrites we touch all recipients of the tee()
1740
1741                *next_stmt_id += 1;
1742                (ret_ident, inner_location_id)
1743            }
1744
1745            HydroNode::Chain { first, second, .. } => {
1746                let (first_ident, first_location_id) =
1747                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1748                let (second_ident, second_location_id) =
1749                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1750
1751                let chain_ident =
1752                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1753
1754                match builders_or_callback {
1755                    BuildersOrCallback::Builders(graph_builders) => {
1756                        assert_eq!(
1757                            first_location_id, second_location_id,
1758                            "chain inputs must be in the same location"
1759                        );
1760                        let builder = graph_builders.entry(first_location_id).or_default();
1761                        builder.add_dfir(
1762                            parse_quote! {
1763                                #chain_ident = chain();
1764                                #first_ident -> [0]#chain_ident;
1765                                #second_ident -> [1]#chain_ident;
1766                            },
1767                            None,
1768                            Some(&next_stmt_id.to_string()),
1769                        );
1770                    }
1771                    BuildersOrCallback::Callback(_, node_callback) => {
1772                        node_callback(self, next_stmt_id);
1773                    }
1774                }
1775
1776                *next_stmt_id += 1;
1777
1778                (chain_ident, first_location_id)
1779            }
1780
1781            HydroNode::CrossSingleton { left, right, .. } => {
1782                let (left_ident, left_location_id) =
1783                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1784                let (right_ident, right_location_id) =
1785                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1786
1787                let cross_ident =
1788                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1789
1790                match builders_or_callback {
1791                    BuildersOrCallback::Builders(graph_builders) => {
1792                        assert_eq!(
1793                            left_location_id, right_location_id,
1794                            "cross_singleton inputs must be in the same location"
1795                        );
1796
1797                        let builder = graph_builders.entry(left_location_id).or_default();
1798                        builder.add_dfir(
1799                            parse_quote! {
1800                                #cross_ident = cross_singleton();
1801                                #left_ident -> [input]#cross_ident;
1802                                #right_ident -> [single]#cross_ident;
1803                            },
1804                            None,
1805                            Some(&next_stmt_id.to_string()),
1806                        );
1807                    }
1808                    BuildersOrCallback::Callback(_, node_callback) => {
1809                        node_callback(self, next_stmt_id);
1810                    }
1811                }
1812
1813                *next_stmt_id += 1;
1814
1815                (cross_ident, left_location_id)
1816            }
1817
1818            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1819                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1820                    parse_quote!(cross_join_multiset)
1821                } else {
1822                    parse_quote!(join_multiset)
1823                };
1824
1825                let (HydroNode::CrossProduct { left, right, .. }
1826                | HydroNode::Join { left, right, .. }) = self
1827                else {
1828                    unreachable!()
1829                };
1830
1831                let (left_inner, left_lifetime) =
1832                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1833                        (left, quote!('static))
1834                    } else {
1835                        (left, quote!('tick))
1836                    };
1837
1838                let (right_inner, right_lifetime) =
1839                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1840                        (right, quote!('static))
1841                    } else {
1842                        (right, quote!('tick))
1843                    };
1844
1845                let (left_ident, left_location_id) =
1846                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1847                let (right_ident, right_location_id) =
1848                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1849
1850                let stream_ident =
1851                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1852
1853                match builders_or_callback {
1854                    BuildersOrCallback::Builders(graph_builders) => {
1855                        assert_eq!(
1856                            left_location_id, right_location_id,
1857                            "join / cross product inputs must be in the same location"
1858                        );
1859
1860                        let builder = graph_builders.entry(left_location_id).or_default();
1861                        builder.add_dfir(
1862                            parse_quote! {
1863                                #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1864                                #left_ident -> [0]#stream_ident;
1865                                #right_ident -> [1]#stream_ident;
1866                            },
1867                            None,
1868                            Some(&next_stmt_id.to_string()),
1869                        );
1870                    }
1871                    BuildersOrCallback::Callback(_, node_callback) => {
1872                        node_callback(self, next_stmt_id);
1873                    }
1874                }
1875
1876                *next_stmt_id += 1;
1877
1878                (stream_ident, left_location_id)
1879            }
1880
1881            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1882                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1883                    parse_quote!(difference_multiset)
1884                } else {
1885                    parse_quote!(anti_join_multiset)
1886                };
1887
1888                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1889                    self
1890                else {
1891                    unreachable!()
1892                };
1893
1894                let (neg, neg_lifetime) =
1895                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1896                        (neg, quote!('static))
1897                    } else {
1898                        (neg, quote!('tick))
1899                    };
1900
1901                let (pos_ident, pos_location_id) =
1902                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1903                let (neg_ident, neg_location_id) =
1904                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1905
1906                let stream_ident =
1907                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1908
1909                match builders_or_callback {
1910                    BuildersOrCallback::Builders(graph_builders) => {
1911                        assert_eq!(
1912                            pos_location_id, neg_location_id,
1913                            "difference / anti join inputs must be in the same location"
1914                        );
1915
1916                        let builder = graph_builders.entry(pos_location_id).or_default();
1917                        builder.add_dfir(
1918                            parse_quote! {
1919                                #stream_ident = #operator::<'tick, #neg_lifetime>();
1920                                #pos_ident -> [pos]#stream_ident;
1921                                #neg_ident -> [neg]#stream_ident;
1922                            },
1923                            None,
1924                            Some(&next_stmt_id.to_string()),
1925                        );
1926                    }
1927                    BuildersOrCallback::Callback(_, node_callback) => {
1928                        node_callback(self, next_stmt_id);
1929                    }
1930                }
1931
1932                *next_stmt_id += 1;
1933
1934                (stream_ident, pos_location_id)
1935            }
1936
1937            HydroNode::ResolveFutures { input, .. } => {
1938                let (input_ident, input_location_id) =
1939                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1940
1941                let futures_ident =
1942                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1943
1944                match builders_or_callback {
1945                    BuildersOrCallback::Builders(graph_builders) => {
1946                        let builder = graph_builders.entry(input_location_id).or_default();
1947                        builder.add_dfir(
1948                            parse_quote! {
1949                                #futures_ident = #input_ident -> resolve_futures();
1950                            },
1951                            None,
1952                            Some(&next_stmt_id.to_string()),
1953                        );
1954                    }
1955                    BuildersOrCallback::Callback(_, node_callback) => {
1956                        node_callback(self, next_stmt_id);
1957                    }
1958                }
1959
1960                *next_stmt_id += 1;
1961
1962                (futures_ident, input_location_id)
1963            }
1964
1965            HydroNode::ResolveFuturesOrdered { input, .. } => {
1966                let (input_ident, input_location_id) =
1967                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1968
1969                let futures_ident =
1970                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1971
1972                match builders_or_callback {
1973                    BuildersOrCallback::Builders(graph_builders) => {
1974                        let builder = graph_builders.entry(input_location_id).or_default();
1975                        builder.add_dfir(
1976                            parse_quote! {
1977                                #futures_ident = #input_ident -> resolve_futures_ordered();
1978                            },
1979                            None,
1980                            Some(&next_stmt_id.to_string()),
1981                        );
1982                    }
1983                    BuildersOrCallback::Callback(_, node_callback) => {
1984                        node_callback(self, next_stmt_id);
1985                    }
1986                }
1987
1988                *next_stmt_id += 1;
1989
1990                (futures_ident, input_location_id)
1991            }
1992
1993            HydroNode::Map { f, input, .. } => {
1994                let (input_ident, input_location_id) =
1995                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1996
1997                let map_ident =
1998                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1999
2000                match builders_or_callback {
2001                    BuildersOrCallback::Builders(graph_builders) => {
2002                        let builder = graph_builders.entry(input_location_id).or_default();
2003                        builder.add_dfir(
2004                            parse_quote! {
2005                                #map_ident = #input_ident -> map(#f);
2006                            },
2007                            None,
2008                            Some(&next_stmt_id.to_string()),
2009                        );
2010                    }
2011                    BuildersOrCallback::Callback(_, node_callback) => {
2012                        node_callback(self, next_stmt_id);
2013                    }
2014                }
2015
2016                *next_stmt_id += 1;
2017
2018                (map_ident, input_location_id)
2019            }
2020
2021            HydroNode::FlatMap { f, input, .. } => {
2022                let (input_ident, input_location_id) =
2023                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2024
2025                let flat_map_ident =
2026                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2027
2028                match builders_or_callback {
2029                    BuildersOrCallback::Builders(graph_builders) => {
2030                        let builder = graph_builders.entry(input_location_id).or_default();
2031                        builder.add_dfir(
2032                            parse_quote! {
2033                                #flat_map_ident = #input_ident -> flat_map(#f);
2034                            },
2035                            None,
2036                            Some(&next_stmt_id.to_string()),
2037                        );
2038                    }
2039                    BuildersOrCallback::Callback(_, node_callback) => {
2040                        node_callback(self, next_stmt_id);
2041                    }
2042                }
2043
2044                *next_stmt_id += 1;
2045
2046                (flat_map_ident, input_location_id)
2047            }
2048
2049            HydroNode::Filter { f, input, .. } => {
2050                let (input_ident, input_location_id) =
2051                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2052
2053                let filter_ident =
2054                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2055
2056                match builders_or_callback {
2057                    BuildersOrCallback::Builders(graph_builders) => {
2058                        let builder = graph_builders.entry(input_location_id).or_default();
2059                        builder.add_dfir(
2060                            parse_quote! {
2061                                #filter_ident = #input_ident -> filter(#f);
2062                            },
2063                            None,
2064                            Some(&next_stmt_id.to_string()),
2065                        );
2066                    }
2067                    BuildersOrCallback::Callback(_, node_callback) => {
2068                        node_callback(self, next_stmt_id);
2069                    }
2070                }
2071
2072                *next_stmt_id += 1;
2073
2074                (filter_ident, input_location_id)
2075            }
2076
2077            HydroNode::FilterMap { f, input, .. } => {
2078                let (input_ident, input_location_id) =
2079                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2080
2081                let filter_map_ident =
2082                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2083
2084                match builders_or_callback {
2085                    BuildersOrCallback::Builders(graph_builders) => {
2086                        let builder = graph_builders.entry(input_location_id).or_default();
2087                        builder.add_dfir(
2088                            parse_quote! {
2089                                #filter_map_ident = #input_ident -> filter_map(#f);
2090                            },
2091                            None,
2092                            Some(&next_stmt_id.to_string()),
2093                        );
2094                    }
2095                    BuildersOrCallback::Callback(_, node_callback) => {
2096                        node_callback(self, next_stmt_id);
2097                    }
2098                }
2099
2100                *next_stmt_id += 1;
2101
2102                (filter_map_ident, input_location_id)
2103            }
2104
2105            HydroNode::Sort { input, .. } => {
2106                let (input_ident, input_location_id) =
2107                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2108
2109                let sort_ident =
2110                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2111
2112                match builders_or_callback {
2113                    BuildersOrCallback::Builders(graph_builders) => {
2114                        let builder = graph_builders.entry(input_location_id).or_default();
2115                        builder.add_dfir(
2116                            parse_quote! {
2117                                #sort_ident = #input_ident -> sort();
2118                            },
2119                            None,
2120                            Some(&next_stmt_id.to_string()),
2121                        );
2122                    }
2123                    BuildersOrCallback::Callback(_, node_callback) => {
2124                        node_callback(self, next_stmt_id);
2125                    }
2126                }
2127
2128                *next_stmt_id += 1;
2129
2130                (sort_ident, input_location_id)
2131            }
2132
2133            HydroNode::DeferTick { input, .. } => {
2134                let (input_ident, input_location_id) =
2135                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2136
2137                let defer_tick_ident =
2138                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2139
2140                match builders_or_callback {
2141                    BuildersOrCallback::Builders(graph_builders) => {
2142                        let builder = graph_builders.entry(input_location_id).or_default();
2143                        builder.add_dfir(
2144                            parse_quote! {
2145                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2146                            },
2147                            None,
2148                            Some(&next_stmt_id.to_string()),
2149                        );
2150                    }
2151                    BuildersOrCallback::Callback(_, node_callback) => {
2152                        node_callback(self, next_stmt_id);
2153                    }
2154                }
2155
2156                *next_stmt_id += 1;
2157
2158                (defer_tick_ident, input_location_id)
2159            }
2160
2161            HydroNode::Enumerate {
2162                is_static, input, ..
2163            } => {
2164                let (input_ident, input_location_id) =
2165                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2166
2167                let enumerate_ident =
2168                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2169
2170                match builders_or_callback {
2171                    BuildersOrCallback::Builders(graph_builders) => {
2172                        let builder = graph_builders.entry(input_location_id).or_default();
2173                        let lifetime = if *is_static {
2174                            quote!('static)
2175                        } else {
2176                            quote!('tick)
2177                        };
2178                        builder.add_dfir(
2179                            parse_quote! {
2180                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2181                            },
2182                            None,
2183                            Some(&next_stmt_id.to_string()),
2184                        );
2185                    }
2186                    BuildersOrCallback::Callback(_, node_callback) => {
2187                        node_callback(self, next_stmt_id);
2188                    }
2189                }
2190
2191                *next_stmt_id += 1;
2192
2193                (enumerate_ident, input_location_id)
2194            }
2195
2196            HydroNode::Inspect { f, input, .. } => {
2197                let (input_ident, input_location_id) =
2198                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2199
2200                let inspect_ident =
2201                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2202
2203                match builders_or_callback {
2204                    BuildersOrCallback::Builders(graph_builders) => {
2205                        let builder = graph_builders.entry(input_location_id).or_default();
2206                        builder.add_dfir(
2207                            parse_quote! {
2208                                #inspect_ident = #input_ident -> inspect(#f);
2209                            },
2210                            None,
2211                            Some(&next_stmt_id.to_string()),
2212                        );
2213                    }
2214                    BuildersOrCallback::Callback(_, node_callback) => {
2215                        node_callback(self, next_stmt_id);
2216                    }
2217                }
2218
2219                *next_stmt_id += 1;
2220
2221                (inspect_ident, input_location_id)
2222            }
2223
2224            HydroNode::Unique { input, .. } => {
2225                let (input_ident, input_location_id) =
2226                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2227
2228                let unique_ident =
2229                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2230
2231                match builders_or_callback {
2232                    BuildersOrCallback::Builders(graph_builders) => {
2233                        let builder = graph_builders.entry(input_location_id).or_default();
2234                        builder.add_dfir(
2235                            parse_quote! {
2236                                #unique_ident = #input_ident -> unique::<'tick>();
2237                            },
2238                            None,
2239                            Some(&next_stmt_id.to_string()),
2240                        );
2241                    }
2242                    BuildersOrCallback::Callback(_, node_callback) => {
2243                        node_callback(self, next_stmt_id);
2244                    }
2245                }
2246
2247                *next_stmt_id += 1;
2248
2249                (unique_ident, input_location_id)
2250            }
2251
2252            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2253                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2254                    parse_quote!(fold)
2255                } else if matches!(self, HydroNode::Scan { .. }) {
2256                    parse_quote!(scan)
2257                } else {
2258                    parse_quote!(fold_keyed)
2259                };
2260
2261                let (HydroNode::Fold {
2262                    init, acc, input, ..
2263                }
2264                | HydroNode::FoldKeyed {
2265                    init, acc, input, ..
2266                }
2267                | HydroNode::Scan {
2268                    init, acc, input, ..
2269                }) = self
2270                else {
2271                    unreachable!()
2272                };
2273
2274                let (input, lifetime) =
2275                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2276                        (input, quote!('static))
2277                    } else {
2278                        (input, quote!('tick))
2279                    };
2280
2281                let (input_ident, input_location_id) =
2282                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2283
2284                let fold_ident =
2285                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2286
2287                match builders_or_callback {
2288                    BuildersOrCallback::Builders(graph_builders) => {
2289                        let builder = graph_builders.entry(input_location_id).or_default();
2290                        builder.add_dfir(
2291                            parse_quote! {
2292                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2293                            },
2294                            None,
2295                            Some(&next_stmt_id.to_string()),
2296                        );
2297                    }
2298                    BuildersOrCallback::Callback(_, node_callback) => {
2299                        node_callback(self, next_stmt_id);
2300                    }
2301                }
2302
2303                *next_stmt_id += 1;
2304
2305                (fold_ident, input_location_id)
2306            }
2307
2308            HydroNode::ReduceKeyedWatermark {
2309                f,
2310                input,
2311                watermark,
2312                ..
2313            } => {
2314                let (input, lifetime) =
2315                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2316                        (input, quote!('static))
2317                    } else {
2318                        (input, quote!('tick))
2319                    };
2320
2321                let (input_ident, input_location_id) =
2322                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2323
2324                let (watermark_ident, watermark_location_id) =
2325                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2326
2327                let chain_ident = syn::Ident::new(
2328                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2329                    Span::call_site(),
2330                );
2331
2332                let fold_ident =
2333                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335                match builders_or_callback {
2336                    BuildersOrCallback::Builders(graph_builders) => {
2337                        assert_eq!(
2338                            input_location_id, watermark_location_id,
2339                            "ReduceKeyedWatermark inputs must be in the same location"
2340                        );
2341
2342                        let builder = graph_builders.entry(input_location_id).or_default();
2343                        // 1. Don't allow any values to be added to the map if the key <=the watermark
2344                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
2345                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
2346                        // 3. Convert the BTreeMap back into a stream of (k, v)
2347                        builder.add_dfir(
2348                            parse_quote! {
2349                                #chain_ident = chain();
2350                                #input_ident
2351                                    -> map(|x| (Some(x), None))
2352                                    -> [0]#chain_ident;
2353                                #watermark_ident
2354                                    -> map(|watermark| (None, Some(watermark)))
2355                                    -> [1]#chain_ident;
2356
2357                                #fold_ident = #chain_ident
2358                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2359                                        let __reduce_keyed_fn = #f;
2360                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2361                                            if let Some((k, v)) = opt_payload {
2362                                                if let Some(curr_watermark) = *opt_curr_watermark {
2363                                                    if k <= curr_watermark {
2364                                                        return;
2365                                                    }
2366                                                }
2367                                                match map.entry(k) {
2368                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
2369                                                        e.insert(v);
2370                                                    }
2371                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
2372                                                        __reduce_keyed_fn(e.get_mut(), v);
2373                                                    }
2374                                                }
2375                                            } else {
2376                                                let watermark = opt_watermark.unwrap();
2377                                                if let Some(curr_watermark) = *opt_curr_watermark {
2378                                                    if watermark <= curr_watermark {
2379                                                        return;
2380                                                    }
2381                                                }
2382                                                *opt_curr_watermark = opt_watermark;
2383                                                map.retain(|k, _| *k > watermark);
2384                                            }
2385                                        }
2386                                    })
2387                                    -> flat_map(|(map, _curr_watermark)| map);
2388                            },
2389                            None,
2390                            Some(&next_stmt_id.to_string()),
2391                        );
2392                    }
2393                    BuildersOrCallback::Callback(_, node_callback) => {
2394                        node_callback(self, next_stmt_id);
2395                    }
2396                }
2397
2398                *next_stmt_id += 1;
2399
2400                (fold_ident, input_location_id)
2401            }
2402
2403            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2404                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2405                    parse_quote!(reduce)
2406                } else {
2407                    parse_quote!(reduce_keyed)
2408                };
2409
2410                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2411                    self
2412                else {
2413                    unreachable!()
2414                };
2415
2416                let (input, lifetime) =
2417                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2418                        (input, quote!('static))
2419                    } else {
2420                        (input, quote!('tick))
2421                    };
2422
2423                let (input_ident, input_location_id) =
2424                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2425
2426                let reduce_ident =
2427                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2428
2429                match builders_or_callback {
2430                    BuildersOrCallback::Builders(graph_builders) => {
2431                        let builder = graph_builders.entry(input_location_id).or_default();
2432                        builder.add_dfir(
2433                            parse_quote! {
2434                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2435                            },
2436                            None,
2437                            Some(&next_stmt_id.to_string()),
2438                        );
2439                    }
2440                    BuildersOrCallback::Callback(_, node_callback) => {
2441                        node_callback(self, next_stmt_id);
2442                    }
2443                }
2444
2445                *next_stmt_id += 1;
2446
2447                (reduce_ident, input_location_id)
2448            }
2449
2450            HydroNode::Network {
2451                serialize_fn: serialize_pipeline,
2452                instantiate_fn,
2453                deserialize_fn: deserialize_pipeline,
2454                input,
2455                metadata,
2456                ..
2457            } => {
2458                let (input_ident, input_location_id) =
2459                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2460
2461                let to_id = metadata.location_kind.root().raw_id();
2462
2463                let receiver_stream_ident =
2464                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2465
2466                match builders_or_callback {
2467                    BuildersOrCallback::Builders(graph_builders) => {
2468                        let (sink_expr, source_expr) = match instantiate_fn {
2469                            DebugInstantiate::Building => (
2470                                syn::parse_quote!(DUMMY_SINK),
2471                                syn::parse_quote!(DUMMY_SOURCE),
2472                            ),
2473
2474                            DebugInstantiate::Finalized(finalized) => {
2475                                (finalized.sink.clone(), finalized.source.clone())
2476                            }
2477                        };
2478
2479                        let sender_builder = graph_builders.entry(input_location_id).or_default();
2480                        if let Some(serialize_pipeline) = serialize_pipeline {
2481                            sender_builder.add_dfir(
2482                                parse_quote! {
2483                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2484                                },
2485                                None,
2486                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
2487                                Some(&format!("send{}", next_stmt_id)),
2488                            );
2489                        } else {
2490                            sender_builder.add_dfir(
2491                                parse_quote! {
2492                                    #input_ident -> dest_sink(#sink_expr);
2493                                },
2494                                None,
2495                                Some(&format!("send{}", next_stmt_id)),
2496                            );
2497                        }
2498
2499                        let receiver_builder = graph_builders.entry(to_id).or_default();
2500                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2501                            receiver_builder.add_dfir(parse_quote! {
2502                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2503                            }, None, Some(&format!("recv{}", next_stmt_id)));
2504                        } else {
2505                            receiver_builder.add_dfir(
2506                                parse_quote! {
2507                                    #receiver_stream_ident = source_stream(#source_expr);
2508                                },
2509                                None,
2510                                Some(&format!("recv{}", next_stmt_id)),
2511                            );
2512                        }
2513                    }
2514                    BuildersOrCallback::Callback(_, node_callback) => {
2515                        node_callback(self, next_stmt_id);
2516                    }
2517                }
2518
2519                *next_stmt_id += 1;
2520
2521                (receiver_stream_ident, to_id)
2522            }
2523
2524            HydroNode::ExternalInput {
2525                instantiate_fn,
2526                deserialize_fn: deserialize_pipeline,
2527                metadata,
2528                ..
2529            } => {
2530                let to_id = metadata.location_kind.root().raw_id();
2531
2532                let receiver_stream_ident =
2533                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535                match builders_or_callback {
2536                    BuildersOrCallback::Builders(graph_builders) => {
2537                        let (_, source_expr) = match instantiate_fn {
2538                            DebugInstantiate::Building => (
2539                                syn::parse_quote!(DUMMY_SINK),
2540                                syn::parse_quote!(DUMMY_SOURCE),
2541                            ),
2542
2543                            DebugInstantiate::Finalized(finalized) => {
2544                                (finalized.sink.clone(), finalized.source.clone())
2545                            }
2546                        };
2547
2548                        let receiver_builder = graph_builders.entry(to_id).or_default();
2549                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2550                            receiver_builder.add_dfir(parse_quote! {
2551                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2552                            }, None, Some(&format!("recv{}", next_stmt_id)));
2553                        } else {
2554                            receiver_builder.add_dfir(
2555                                parse_quote! {
2556                                    #receiver_stream_ident = source_stream(#source_expr);
2557                                },
2558                                None,
2559                                Some(&format!("recv{}", next_stmt_id)),
2560                            );
2561                        }
2562                    }
2563                    BuildersOrCallback::Callback(_, node_callback) => {
2564                        node_callback(self, next_stmt_id);
2565                    }
2566                }
2567
2568                *next_stmt_id += 1;
2569
2570                (receiver_stream_ident, to_id)
2571            }
2572
2573            HydroNode::Counter {
2574                tag,
2575                duration,
2576                input,
2577                ..
2578            } => {
2579                let (input_ident, input_location_id) =
2580                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2581
2582                let counter_ident =
2583                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2584
2585                match builders_or_callback {
2586                    BuildersOrCallback::Builders(graph_builders) => {
2587                        let builder = graph_builders.entry(input_location_id).or_default();
2588                        builder.add_dfir(
2589                            parse_quote! {
2590                                #counter_ident = #input_ident -> _counter(#tag, #duration);
2591                            },
2592                            None,
2593                            Some(&next_stmt_id.to_string()),
2594                        );
2595                    }
2596                    BuildersOrCallback::Callback(_, node_callback) => {
2597                        node_callback(self, next_stmt_id);
2598                    }
2599                }
2600
2601                *next_stmt_id += 1;
2602
2603                (counter_ident, input_location_id)
2604            }
2605        }
2606    }
2607
2608    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2609        match self {
2610            HydroNode::Placeholder => {
2611                panic!()
2612            }
2613            HydroNode::Source { source, .. } => match source {
2614                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2615                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2616            },
2617            HydroNode::CycleSource { .. }
2618            | HydroNode::Tee { .. }
2619            | HydroNode::Persist { .. }
2620            | HydroNode::Unpersist { .. }
2621            | HydroNode::Delta { .. }
2622            | HydroNode::Chain { .. }
2623            | HydroNode::CrossProduct { .. }
2624            | HydroNode::CrossSingleton { .. }
2625            | HydroNode::ResolveFutures { .. }
2626            | HydroNode::ResolveFuturesOrdered { .. }
2627            | HydroNode::Join { .. }
2628            | HydroNode::Difference { .. }
2629            | HydroNode::AntiJoin { .. }
2630            | HydroNode::DeferTick { .. }
2631            | HydroNode::Enumerate { .. }
2632            | HydroNode::Unique { .. }
2633            | HydroNode::Sort { .. } => {}
2634            HydroNode::Map { f, .. }
2635            | HydroNode::FlatMap { f, .. }
2636            | HydroNode::Filter { f, .. }
2637            | HydroNode::FilterMap { f, .. }
2638            | HydroNode::Inspect { f, .. }
2639            | HydroNode::Reduce { f, .. }
2640            | HydroNode::ReduceKeyed { f, .. }
2641            | HydroNode::ReduceKeyedWatermark { f, .. } => {
2642                transform(f);
2643            }
2644            HydroNode::Fold { init, acc, .. }
2645            | HydroNode::Scan { init, acc, .. }
2646            | HydroNode::FoldKeyed { init, acc, .. } => {
2647                transform(init);
2648                transform(acc);
2649            }
2650            HydroNode::Network {
2651                serialize_fn,
2652                deserialize_fn,
2653                ..
2654            } => {
2655                if let Some(serialize_fn) = serialize_fn {
2656                    transform(serialize_fn);
2657                }
2658                if let Some(deserialize_fn) = deserialize_fn {
2659                    transform(deserialize_fn);
2660                }
2661            }
2662            HydroNode::ExternalInput { deserialize_fn, .. } => {
2663                if let Some(deserialize_fn) = deserialize_fn {
2664                    transform(deserialize_fn);
2665                }
2666            }
2667            HydroNode::Counter { duration, .. } => {
2668                transform(duration);
2669            }
2670        }
2671    }
2672
2673    pub fn metadata(&self) -> &HydroIrMetadata {
2674        match self {
2675            HydroNode::Placeholder => {
2676                panic!()
2677            }
2678            HydroNode::Source { metadata, .. } => metadata,
2679            HydroNode::CycleSource { metadata, .. } => metadata,
2680            HydroNode::Tee { metadata, .. } => metadata,
2681            HydroNode::Persist { metadata, .. } => metadata,
2682            HydroNode::Unpersist { metadata, .. } => metadata,
2683            HydroNode::Delta { metadata, .. } => metadata,
2684            HydroNode::Chain { metadata, .. } => metadata,
2685            HydroNode::CrossProduct { metadata, .. } => metadata,
2686            HydroNode::CrossSingleton { metadata, .. } => metadata,
2687            HydroNode::Join { metadata, .. } => metadata,
2688            HydroNode::Difference { metadata, .. } => metadata,
2689            HydroNode::AntiJoin { metadata, .. } => metadata,
2690            HydroNode::ResolveFutures { metadata, .. } => metadata,
2691            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2692            HydroNode::Map { metadata, .. } => metadata,
2693            HydroNode::FlatMap { metadata, .. } => metadata,
2694            HydroNode::Filter { metadata, .. } => metadata,
2695            HydroNode::FilterMap { metadata, .. } => metadata,
2696            HydroNode::DeferTick { metadata, .. } => metadata,
2697            HydroNode::Enumerate { metadata, .. } => metadata,
2698            HydroNode::Inspect { metadata, .. } => metadata,
2699            HydroNode::Unique { metadata, .. } => metadata,
2700            HydroNode::Sort { metadata, .. } => metadata,
2701            HydroNode::Scan { metadata, .. } => metadata,
2702            HydroNode::Fold { metadata, .. } => metadata,
2703            HydroNode::FoldKeyed { metadata, .. } => metadata,
2704            HydroNode::Reduce { metadata, .. } => metadata,
2705            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2706            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2707            HydroNode::ExternalInput { metadata, .. } => metadata,
2708            HydroNode::Network { metadata, .. } => metadata,
2709            HydroNode::Counter { metadata, .. } => metadata,
2710        }
2711    }
2712
2713    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2714        match self {
2715            HydroNode::Placeholder => {
2716                panic!()
2717            }
2718            HydroNode::Source { metadata, .. } => metadata,
2719            HydroNode::CycleSource { metadata, .. } => metadata,
2720            HydroNode::Tee { metadata, .. } => metadata,
2721            HydroNode::Persist { metadata, .. } => metadata,
2722            HydroNode::Unpersist { metadata, .. } => metadata,
2723            HydroNode::Delta { metadata, .. } => metadata,
2724            HydroNode::Chain { metadata, .. } => metadata,
2725            HydroNode::CrossProduct { metadata, .. } => metadata,
2726            HydroNode::CrossSingleton { metadata, .. } => metadata,
2727            HydroNode::Join { metadata, .. } => metadata,
2728            HydroNode::Difference { metadata, .. } => metadata,
2729            HydroNode::AntiJoin { metadata, .. } => metadata,
2730            HydroNode::ResolveFutures { metadata, .. } => metadata,
2731            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2732            HydroNode::Map { metadata, .. } => metadata,
2733            HydroNode::FlatMap { metadata, .. } => metadata,
2734            HydroNode::Filter { metadata, .. } => metadata,
2735            HydroNode::FilterMap { metadata, .. } => metadata,
2736            HydroNode::DeferTick { metadata, .. } => metadata,
2737            HydroNode::Enumerate { metadata, .. } => metadata,
2738            HydroNode::Inspect { metadata, .. } => metadata,
2739            HydroNode::Unique { metadata, .. } => metadata,
2740            HydroNode::Sort { metadata, .. } => metadata,
2741            HydroNode::Scan { metadata, .. } => metadata,
2742            HydroNode::Fold { metadata, .. } => metadata,
2743            HydroNode::FoldKeyed { metadata, .. } => metadata,
2744            HydroNode::Reduce { metadata, .. } => metadata,
2745            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2746            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2747            HydroNode::ExternalInput { metadata, .. } => metadata,
2748            HydroNode::Network { metadata, .. } => metadata,
2749            HydroNode::Counter { metadata, .. } => metadata,
2750        }
2751    }
2752
2753    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2754        match self {
2755            HydroNode::Placeholder => {
2756                panic!()
2757            }
2758            HydroNode::Source { .. }
2759            | HydroNode::ExternalInput { .. }
2760            | HydroNode::CycleSource { .. } // CycleSource and Tee should calculate input metadata in separate special ways
2761            | HydroNode::Tee { .. } => {
2762                vec![]
2763            }
2764            HydroNode::Persist { inner, .. }
2765            | HydroNode::Unpersist { inner, .. }
2766            | HydroNode::Delta { inner, .. } => {
2767                vec![inner.metadata()]
2768            }
2769            HydroNode::Chain { first, second, .. } => {
2770                vec![first.metadata(), second.metadata()]
2771            }
2772            HydroNode::CrossProduct { left, right, .. }
2773            | HydroNode::CrossSingleton { left, right, .. }
2774            | HydroNode::Join { left, right, .. } => {
2775                vec![left.metadata(), right.metadata()]
2776            }
2777            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2778                vec![pos.metadata(), neg.metadata()]
2779            }
2780            HydroNode::Map { input, .. }
2781            | HydroNode::FlatMap { input, .. }
2782            | HydroNode::Filter { input, .. }
2783            | HydroNode::FilterMap { input, .. }
2784            | HydroNode::Sort { input, .. }
2785            | HydroNode::DeferTick { input, .. }
2786            | HydroNode::Enumerate { input, .. }
2787            | HydroNode::Inspect { input, .. }
2788            | HydroNode::Unique { input, .. }
2789            | HydroNode::Network { input, .. }
2790            | HydroNode::Counter { input, .. }
2791            | HydroNode::ResolveFutures { input, .. }
2792            | HydroNode::ResolveFuturesOrdered { input, .. } => {
2793                vec![input.metadata()]
2794            }
2795            HydroNode::Fold { input, .. }
2796            | HydroNode::FoldKeyed { input, .. }
2797            | HydroNode::Reduce { input, .. }
2798            | HydroNode::ReduceKeyed { input, .. }
2799            | HydroNode::Scan { input, .. } => {
2800                // Skip persist before fold/reduce
2801                if let HydroNode::Persist { inner, .. } = input.as_ref() {
2802                    vec![inner.metadata()]
2803                } else {
2804                    vec![input.metadata()]
2805                }
2806            }
2807            HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
2808                // Skip persist before fold/reduce
2809                if let HydroNode::Persist { inner, .. } = input.as_ref() {
2810                    vec![inner.metadata(), watermark.metadata()]
2811                } else {
2812                    vec![input.metadata(), watermark.metadata()]
2813                }
2814            }
2815        }
2816    }
2817
2818    pub fn print_root(&self) -> String {
2819        match self {
2820            HydroNode::Placeholder => {
2821                panic!()
2822            }
2823            HydroNode::Source { source, .. } => format!("Source({:?})", source),
2824            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2825            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2826            HydroNode::Persist { .. } => "Persist()".to_string(),
2827            HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2828            HydroNode::Delta { .. } => "Delta()".to_string(),
2829            HydroNode::Chain { first, second, .. } => {
2830                format!("Chain({}, {})", first.print_root(), second.print_root())
2831            }
2832            HydroNode::CrossProduct { left, right, .. } => {
2833                format!(
2834                    "CrossProduct({}, {})",
2835                    left.print_root(),
2836                    right.print_root()
2837                )
2838            }
2839            HydroNode::CrossSingleton { left, right, .. } => {
2840                format!(
2841                    "CrossSingleton({}, {})",
2842                    left.print_root(),
2843                    right.print_root()
2844                )
2845            }
2846            HydroNode::Join { left, right, .. } => {
2847                format!("Join({}, {})", left.print_root(), right.print_root())
2848            }
2849            HydroNode::Difference { pos, neg, .. } => {
2850                format!("Difference({}, {})", pos.print_root(), neg.print_root())
2851            }
2852            HydroNode::AntiJoin { pos, neg, .. } => {
2853                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2854            }
2855            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2856            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2857            HydroNode::Map { f, .. } => format!("Map({:?})", f),
2858            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2859            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2860            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2861            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2862            HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2863            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2864            HydroNode::Unique { .. } => "Unique()".to_string(),
2865            HydroNode::Sort { .. } => "Sort()".to_string(),
2866            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2867            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
2868            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2869            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2870            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2871            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
2872            HydroNode::Network { .. } => "Network()".to_string(),
2873            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
2874            HydroNode::Counter { tag, duration, .. } => {
2875                format!("Counter({:?}, {:?})", tag, duration)
2876            }
2877        }
2878    }
2879}
2880
2881#[cfg(feature = "build")]
2882fn instantiate_network<'a, D>(
2883    from_location: &LocationId,
2884    to_location: &LocationId,
2885    processes: &HashMap<usize, D::Process>,
2886    clusters: &HashMap<usize, D::Cluster>,
2887    compile_env: &D::CompileEnv,
2888) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
2889where
2890    D: Deploy<'a>,
2891{
2892    let ((sink, source), connect_fn) = match (from_location, to_location) {
2893        (LocationId::Process(from), LocationId::Process(to)) => {
2894            let from_node = processes
2895                .get(from)
2896                .unwrap_or_else(|| {
2897                    panic!("A process used in the graph was not instantiated: {}", from)
2898                })
2899                .clone();
2900            let to_node = processes
2901                .get(to)
2902                .unwrap_or_else(|| {
2903                    panic!("A process used in the graph was not instantiated: {}", to)
2904                })
2905                .clone();
2906
2907            let sink_port = D::allocate_process_port(&from_node);
2908            let source_port = D::allocate_process_port(&to_node);
2909
2910            (
2911                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2912                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2913            )
2914        }
2915        (LocationId::Process(from), LocationId::Cluster(to)) => {
2916            let from_node = processes
2917                .get(from)
2918                .unwrap_or_else(|| {
2919                    panic!("A process used in the graph was not instantiated: {}", from)
2920                })
2921                .clone();
2922            let to_node = clusters
2923                .get(to)
2924                .unwrap_or_else(|| {
2925                    panic!("A cluster used in the graph was not instantiated: {}", to)
2926                })
2927                .clone();
2928
2929            let sink_port = D::allocate_process_port(&from_node);
2930            let source_port = D::allocate_cluster_port(&to_node);
2931
2932            (
2933                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2934                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2935            )
2936        }
2937        (LocationId::Cluster(from), LocationId::Process(to)) => {
2938            let from_node = clusters
2939                .get(from)
2940                .unwrap_or_else(|| {
2941                    panic!("A cluster used in the graph was not instantiated: {}", from)
2942                })
2943                .clone();
2944            let to_node = processes
2945                .get(to)
2946                .unwrap_or_else(|| {
2947                    panic!("A process used in the graph was not instantiated: {}", to)
2948                })
2949                .clone();
2950
2951            let sink_port = D::allocate_cluster_port(&from_node);
2952            let source_port = D::allocate_process_port(&to_node);
2953
2954            (
2955                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2956                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2957            )
2958        }
2959        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2960            let from_node = clusters
2961                .get(from)
2962                .unwrap_or_else(|| {
2963                    panic!("A cluster used in the graph was not instantiated: {}", from)
2964                })
2965                .clone();
2966            let to_node = clusters
2967                .get(to)
2968                .unwrap_or_else(|| {
2969                    panic!("A cluster used in the graph was not instantiated: {}", to)
2970                })
2971                .clone();
2972
2973            let sink_port = D::allocate_cluster_port(&from_node);
2974            let source_port = D::allocate_cluster_port(&to_node);
2975
2976            (
2977                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2978                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2979            )
2980        }
2981        (LocationId::Tick(_, _), _) => panic!(),
2982        (_, LocationId::Tick(_, _)) => panic!(),
2983    };
2984    (sink, source, connect_fn)
2985}
2986
2987#[cfg(test)]
2988mod test {
2989    use std::mem::size_of;
2990
2991    use stageleft::{QuotedWithContext, q};
2992
2993    use super::*;
2994
2995    #[test]
2996    fn hydro_node_size() {
2997        assert_eq!(size_of::<HydroNode>(), 232);
2998    }
2999
3000    #[test]
3001    fn hydro_leaf_size() {
3002        assert_eq!(size_of::<HydroLeaf>(), 224);
3003    }
3004
3005    #[test]
3006    fn test_simplify_q_macro_basic() {
3007        // Test basic non-q! expression
3008        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3009        let result = simplify_q_macro(simple_expr.clone());
3010        assert_eq!(result, simple_expr);
3011    }
3012
3013    #[test]
3014    fn test_simplify_q_macro_actual_stageleft_call() {
3015        // Test a simplified version of what a real stageleft call might look like
3016        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3017        let result = simplify_q_macro(stageleft_call);
3018        // This should be processed by our visitor and simplified to q!(...)
3019        // since we detect the stageleft::runtime_support::fn_* pattern
3020        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3021    }
3022
3023    #[test]
3024    fn test_closure_no_pipe_at_start() {
3025        // Test a closure that does not start with a pipe
3026        let stageleft_call = q!({
3027            let foo = 123;
3028            move |b: usize| b + foo
3029        })
3030        .splice_fn1_ctx(&());
3031        let result = simplify_q_macro(stageleft_call);
3032        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3033    }
3034}