hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307}
308
309#[cfg(feature = "build")]
310pub enum BuildersOrCallback<'a, L, N>
311where
312    L: FnMut(&mut HydroRoot, &mut usize),
313    N: FnMut(&mut HydroNode, &mut usize),
314{
315    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
316    Callback(L, N),
317}
318
319/// An root in a Hydro graph, which is an pipeline that doesn't emit
320/// any downstream values. Traversals over the dataflow graph and
321/// generating DFIR IR start from roots.
322#[derive(Debug, Hash)]
323pub enum HydroRoot {
324    ForEach {
325        f: DebugExpr,
326        input: Box<HydroNode>,
327        op_metadata: HydroIrOpMetadata,
328    },
329    SendExternal {
330        to_external_id: usize,
331        to_key: usize,
332        to_many: bool,
333        serialize_fn: Option<DebugExpr>,
334        instantiate_fn: DebugInstantiate,
335        input: Box<HydroNode>,
336        op_metadata: HydroIrOpMetadata,
337    },
338    DestSink {
339        sink: DebugExpr,
340        input: Box<HydroNode>,
341        op_metadata: HydroIrOpMetadata,
342    },
343    CycleSink {
344        ident: syn::Ident,
345        input: Box<HydroNode>,
346        out_location: LocationId,
347        op_metadata: HydroIrOpMetadata,
348    },
349}
350
351impl HydroRoot {
352    #[cfg(feature = "build")]
353    pub fn compile_network<'a, D>(
354        &mut self,
355        compile_env: &D::CompileEnv,
356        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
357        seen_tees: &mut SeenTees,
358        processes: &HashMap<usize, D::Process>,
359        clusters: &HashMap<usize, D::Cluster>,
360        externals: &HashMap<usize, D::External>,
361    ) where
362        D: Deploy<'a>,
363    {
364        self.transform_bottom_up(
365            &mut |l| {
366                if let HydroRoot::SendExternal {
367                    input,
368                    to_external_id,
369                    to_key,
370                    to_many,
371                    instantiate_fn,
372                    ..
373                } = l
374                {
375                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
376                        DebugInstantiate::Building => {
377                            let to_node = externals
378                                .get(to_external_id)
379                                .unwrap_or_else(|| {
380                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
381                                })
382                                .clone();
383
384                            match input.metadata().location_kind.root() {
385                                LocationId::Process(process_id) => {
386                                    if *to_many {
387                                        (
388                                            (
389                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
390                                                parse_quote!(DUMMY),
391                                            ),
392                                            Box::new(|| {}) as Box<dyn FnOnce()>,
393                                        )
394                                    } else {
395                                        let from_node = processes
396                                            .get(process_id)
397                                            .unwrap_or_else(|| {
398                                                panic!("A process used in the graph was not instantiated: {}", process_id)
399                                            })
400                                            .clone();
401
402                                        let sink_port = D::allocate_process_port(&from_node);
403                                        let source_port = D::allocate_external_port(&to_node);
404
405                                        to_node.register(*to_key, source_port.clone());
406
407                                        (
408                                            (
409                                                D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
410                                                parse_quote!(DUMMY),
411                                            ),
412                                            D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
413                                        )
414                                    }
415                                }
416                                LocationId::Cluster(_) => todo!(),
417                                _ => panic!()
418                            }
419                        },
420
421                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
422                    };
423
424                    *instantiate_fn = DebugInstantiateFinalized {
425                        sink: sink_expr,
426                        source: source_expr,
427                        connect_fn: Some(connect_fn),
428                    }
429                    .into();
430                }
431            },
432            &mut |n| {
433                if let HydroNode::Network {
434                    input,
435                    instantiate_fn,
436                    metadata,
437                    ..
438                } = n
439                {
440                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
441                        DebugInstantiate::Building => instantiate_network::<D>(
442                            input.metadata().location_kind.root(),
443                            metadata.location_kind.root(),
444                            processes,
445                            clusters,
446                            compile_env,
447                        ),
448
449                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
450                    };
451
452                    *instantiate_fn = DebugInstantiateFinalized {
453                        sink: sink_expr,
454                        source: source_expr,
455                        connect_fn: Some(connect_fn),
456                    }
457                    .into();
458                } else if let HydroNode::ExternalInput {
459                    from_external_id,
460                    from_key,
461                    from_many,
462                    codec_type,
463                    port_hint,
464                    instantiate_fn,
465                    metadata,
466                    ..
467                } = n
468                {
469                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
470                        DebugInstantiate::Building => {
471                            let from_node = externals
472                                .get(from_external_id)
473                                .unwrap_or_else(|| {
474                                    panic!(
475                                        "A external used in the graph was not instantiated: {}",
476                                        from_external_id
477                                    )
478                                })
479                                .clone();
480
481                            match metadata.location_kind.root() {
482                                LocationId::Process(process_id) => {
483                                    let to_node = processes
484                                        .get(process_id)
485                                        .unwrap_or_else(|| {
486                                            panic!("A process used in the graph was not instantiated: {}", process_id)
487                                        })
488                                        .clone();
489
490                                    let sink_port = D::allocate_external_port(&from_node);
491                                    let source_port = D::allocate_process_port(&to_node);
492
493                                    from_node.register(*from_key, sink_port.clone());
494
495                                    (
496                                        (
497                                            parse_quote!(DUMMY),
498                                            if *from_many {
499                                                D::e2o_many_source(
500                                                    compile_env,
501                                                    extra_stmts.entry(*process_id).or_default(),
502                                                    &to_node, &source_port,
503                                                    codec_type.0.as_ref(),
504                                                    format!("{}_{}", *from_external_id, *from_key)
505                                                )
506                                            } else {
507                                                D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
508                                            },
509                                        ),
510                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
511                                    )
512                                }
513                                LocationId::Cluster(_) => todo!(),
514                                _ => panic!()
515                            }
516                        },
517
518                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
519                    };
520
521                    *instantiate_fn = DebugInstantiateFinalized {
522                        sink: sink_expr,
523                        source: source_expr,
524                        connect_fn: Some(connect_fn),
525                    }
526                    .into();
527                }
528            },
529            seen_tees,
530            false,
531        );
532    }
533
534    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
535        self.transform_bottom_up(
536            &mut |l| {
537                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
538                    match instantiate_fn {
539                        DebugInstantiate::Building => panic!("network not built"),
540
541                        DebugInstantiate::Finalized(finalized) => {
542                            (finalized.connect_fn.take().unwrap())();
543                        }
544                    }
545                }
546            },
547            &mut |n| {
548                if let HydroNode::Network { instantiate_fn, .. }
549                | HydroNode::ExternalInput { instantiate_fn, .. } = n
550                {
551                    match instantiate_fn {
552                        DebugInstantiate::Building => panic!("network not built"),
553
554                        DebugInstantiate::Finalized(finalized) => {
555                            (finalized.connect_fn.take().unwrap())();
556                        }
557                    }
558                }
559            },
560            seen_tees,
561            false,
562        );
563    }
564
565    pub fn transform_bottom_up(
566        &mut self,
567        transform_root: &mut impl FnMut(&mut HydroRoot),
568        transform_node: &mut impl FnMut(&mut HydroNode),
569        seen_tees: &mut SeenTees,
570        check_well_formed: bool,
571    ) {
572        self.transform_children(
573            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
574            seen_tees,
575        );
576
577        transform_root(self);
578    }
579
580    pub fn transform_children(
581        &mut self,
582        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
583        seen_tees: &mut SeenTees,
584    ) {
585        match self {
586            HydroRoot::ForEach { input, .. }
587            | HydroRoot::SendExternal { input, .. }
588            | HydroRoot::DestSink { input, .. }
589            | HydroRoot::CycleSink { input, .. } => {
590                transform(input, seen_tees);
591            }
592        }
593    }
594
595    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
596        match self {
597            HydroRoot::ForEach {
598                f,
599                input,
600                op_metadata,
601            } => HydroRoot::ForEach {
602                f: f.clone(),
603                input: Box::new(input.deep_clone(seen_tees)),
604                op_metadata: op_metadata.clone(),
605            },
606            HydroRoot::SendExternal {
607                to_external_id,
608                to_key,
609                to_many,
610                serialize_fn,
611                instantiate_fn,
612                input,
613                op_metadata,
614            } => HydroRoot::SendExternal {
615                to_external_id: *to_external_id,
616                to_key: *to_key,
617                to_many: *to_many,
618                serialize_fn: serialize_fn.clone(),
619                instantiate_fn: instantiate_fn.clone(),
620                input: Box::new(input.deep_clone(seen_tees)),
621                op_metadata: op_metadata.clone(),
622            },
623            HydroRoot::DestSink {
624                sink,
625                input,
626                op_metadata,
627            } => HydroRoot::DestSink {
628                sink: sink.clone(),
629                input: Box::new(input.deep_clone(seen_tees)),
630                op_metadata: op_metadata.clone(),
631            },
632            HydroRoot::CycleSink {
633                ident,
634                input,
635                out_location,
636                op_metadata,
637            } => HydroRoot::CycleSink {
638                ident: ident.clone(),
639                input: Box::new(input.deep_clone(seen_tees)),
640                out_location: out_location.clone(),
641                op_metadata: op_metadata.clone(),
642            },
643        }
644    }
645
646    #[cfg(feature = "build")]
647    pub fn emit(
648        &mut self,
649        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
650        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
651        next_stmt_id: &mut usize,
652    ) {
653        self.emit_core(
654            &mut BuildersOrCallback::Builders::<
655                fn(&mut HydroRoot, &mut usize),
656                fn(&mut HydroNode, &mut usize),
657            >(graph_builders),
658            built_tees,
659            next_stmt_id,
660        );
661    }
662
663    #[cfg(feature = "build")]
664    pub fn emit_core(
665        &mut self,
666        builders_or_callback: &mut BuildersOrCallback<
667            impl FnMut(&mut HydroRoot, &mut usize),
668            impl FnMut(&mut HydroNode, &mut usize),
669        >,
670        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
671        next_stmt_id: &mut usize,
672    ) {
673        match self {
674            HydroRoot::ForEach { f, input, .. } => {
675                let (input_ident, input_location_id) =
676                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
677
678                match builders_or_callback {
679                    BuildersOrCallback::Builders(graph_builders) => {
680                        graph_builders
681                            .entry(input_location_id)
682                            .or_default()
683                            .add_dfir(
684                                parse_quote! {
685                                    #input_ident -> for_each(#f);
686                                },
687                                None,
688                                Some(&next_stmt_id.to_string()),
689                            );
690                    }
691                    BuildersOrCallback::Callback(leaf_callback, _) => {
692                        leaf_callback(self, next_stmt_id);
693                    }
694                }
695
696                *next_stmt_id += 1;
697            }
698
699            HydroRoot::SendExternal {
700                serialize_fn,
701                instantiate_fn,
702                input,
703                ..
704            } => {
705                let (input_ident, input_location_id) =
706                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
707
708                match builders_or_callback {
709                    BuildersOrCallback::Builders(graph_builders) => {
710                        let (sink_expr, _) = match instantiate_fn {
711                            DebugInstantiate::Building => (
712                                syn::parse_quote!(DUMMY_SINK),
713                                syn::parse_quote!(DUMMY_SOURCE),
714                            ),
715
716                            DebugInstantiate::Finalized(finalized) => {
717                                (finalized.sink.clone(), finalized.source.clone())
718                            }
719                        };
720
721                        let sender_builder = graph_builders.entry(input_location_id).or_default();
722                        if let Some(serialize_fn) = serialize_fn {
723                            sender_builder.add_dfir(
724                                parse_quote! {
725                                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
726                                },
727                                None,
728                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
729                                Some(&format!("send{}", next_stmt_id)),
730                            );
731                        } else {
732                            sender_builder.add_dfir(
733                                parse_quote! {
734                                    #input_ident -> dest_sink(#sink_expr);
735                                },
736                                None,
737                                Some(&format!("send{}", next_stmt_id)),
738                            );
739                        }
740                    }
741                    BuildersOrCallback::Callback(leaf_callback, _) => {
742                        leaf_callback(self, next_stmt_id);
743                    }
744                }
745
746                *next_stmt_id += 1;
747            }
748
749            HydroRoot::DestSink { sink, input, .. } => {
750                let (input_ident, input_location_id) =
751                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
752
753                match builders_or_callback {
754                    BuildersOrCallback::Builders(graph_builders) => {
755                        graph_builders
756                            .entry(input_location_id)
757                            .or_default()
758                            .add_dfir(
759                                parse_quote! {
760                                    #input_ident -> dest_sink(#sink);
761                                },
762                                None,
763                                Some(&next_stmt_id.to_string()),
764                            );
765                    }
766                    BuildersOrCallback::Callback(leaf_callback, _) => {
767                        leaf_callback(self, next_stmt_id);
768                    }
769                }
770
771                *next_stmt_id += 1;
772            }
773
774            HydroRoot::CycleSink {
775                ident,
776                input,
777                out_location,
778                ..
779            } => {
780                let (input_ident, input_location_id) =
781                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
782
783                let location_id = out_location.root().raw_id();
784
785                match builders_or_callback {
786                    BuildersOrCallback::Builders(graph_builders) => {
787                        assert_eq!(
788                            input_location_id, location_id,
789                            "cycle_sink location mismatch"
790                        );
791
792                        graph_builders.entry(location_id).or_default().add_dfir(
793                            parse_quote! {
794                                #ident = #input_ident;
795                            },
796                            None,
797                            None,
798                        );
799                    }
800                    // No ID, no callback
801                    BuildersOrCallback::Callback(_, _) => {}
802                }
803            }
804        }
805    }
806
807    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
808        match self {
809            HydroRoot::ForEach { op_metadata, .. }
810            | HydroRoot::SendExternal { op_metadata, .. }
811            | HydroRoot::DestSink { op_metadata, .. }
812            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
813        }
814    }
815
816    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
817        match self {
818            HydroRoot::ForEach { op_metadata, .. }
819            | HydroRoot::SendExternal { op_metadata, .. }
820            | HydroRoot::DestSink { op_metadata, .. }
821            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
822        }
823    }
824
825    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
826        match self {
827            HydroRoot::ForEach { input, .. }
828            | HydroRoot::SendExternal { input, .. }
829            | HydroRoot::DestSink { input, .. }
830            | HydroRoot::CycleSink { input, .. } => {
831                vec![input.metadata()]
832            }
833        }
834    }
835
836    pub fn print_root(&self) -> String {
837        match self {
838            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
839            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
840            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
841            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
842        }
843    }
844
845    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
846        match self {
847            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
848                transform(f);
849            }
850            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
851        }
852    }
853}
854
855#[cfg(feature = "build")]
856pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
857    let mut builders = BTreeMap::new();
858    let mut built_tees = HashMap::new();
859    let mut next_stmt_id = 0;
860    for leaf in ir {
861        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
862    }
863    builders
864}
865
866#[cfg(feature = "build")]
867pub fn traverse_dfir(
868    ir: &mut [HydroRoot],
869    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
870    transform_node: impl FnMut(&mut HydroNode, &mut usize),
871) {
872    let mut seen_tees = HashMap::new();
873    let mut next_stmt_id = 0;
874    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
875    ir.iter_mut().for_each(|leaf| {
876        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
877    });
878}
879
880pub fn transform_bottom_up(
881    ir: &mut [HydroRoot],
882    transform_root: &mut impl FnMut(&mut HydroRoot),
883    transform_node: &mut impl FnMut(&mut HydroNode),
884    check_well_formed: bool,
885) {
886    let mut seen_tees = HashMap::new();
887    ir.iter_mut().for_each(|leaf| {
888        leaf.transform_bottom_up(
889            transform_root,
890            transform_node,
891            &mut seen_tees,
892            check_well_formed,
893        );
894    });
895}
896
897pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
898    let mut seen_tees = HashMap::new();
899    ir.iter()
900        .map(|leaf| leaf.deep_clone(&mut seen_tees))
901        .collect()
902}
903
904type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
905thread_local! {
906    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
907}
908
909pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
910    PRINTED_TEES.with(|printed_tees| {
911        let mut printed_tees_mut = printed_tees.borrow_mut();
912        *printed_tees_mut = Some((0, HashMap::new()));
913        drop(printed_tees_mut);
914
915        let ret = f();
916
917        let mut printed_tees_mut = printed_tees.borrow_mut();
918        *printed_tees_mut = None;
919
920        ret
921    })
922}
923
924pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
925
926impl TeeNode {
927    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
928        Rc::as_ptr(&self.0)
929    }
930}
931
932impl Debug for TeeNode {
933    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934        PRINTED_TEES.with(|printed_tees| {
935            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
936            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
937
938            if let Some(printed_tees_mut) = printed_tees_mut {
939                if let Some(existing) = printed_tees_mut
940                    .1
941                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
942                {
943                    write!(f, "<tee {}>", existing)
944                } else {
945                    let next_id = printed_tees_mut.0;
946                    printed_tees_mut.0 += 1;
947                    printed_tees_mut
948                        .1
949                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
950                    drop(printed_tees_mut_borrow);
951                    write!(f, "<tee {}>: ", next_id)?;
952                    Debug::fmt(&self.0.borrow(), f)
953                }
954            } else {
955                drop(printed_tees_mut_borrow);
956                write!(f, "<tee>: ")?;
957                Debug::fmt(&self.0.borrow(), f)
958            }
959        })
960    }
961}
962
963impl Hash for TeeNode {
964    fn hash<H: Hasher>(&self, state: &mut H) {
965        self.0.borrow_mut().hash(state);
966    }
967}
968
969#[derive(Clone)]
970pub struct HydroIrMetadata {
971    pub location_kind: LocationId,
972    pub output_type: Option<DebugType>,
973    pub cardinality: Option<usize>,
974    pub tag: Option<String>,
975    pub op: HydroIrOpMetadata,
976}
977
978// HydroIrMetadata shouldn't be used to hash or compare
979impl Hash for HydroIrMetadata {
980    fn hash<H: Hasher>(&self, _: &mut H) {}
981}
982
983impl PartialEq for HydroIrMetadata {
984    fn eq(&self, _: &Self) -> bool {
985        true
986    }
987}
988
989impl Eq for HydroIrMetadata {}
990
991impl Debug for HydroIrMetadata {
992    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993        f.debug_struct("HydroIrMetadata")
994            .field("location_kind", &self.location_kind)
995            .field("output_type", &self.output_type)
996            .finish()
997    }
998}
999
1000/// Metadata that is specific to the operator itself, rather than its outputs.
1001/// This is available on _both_ inner nodes and roots.
1002#[derive(Clone)]
1003pub struct HydroIrOpMetadata {
1004    pub backtrace: Backtrace,
1005    pub cpu_usage: Option<f64>,
1006    pub network_recv_cpu_usage: Option<f64>,
1007    pub id: Option<usize>,
1008}
1009
1010impl HydroIrOpMetadata {
1011    #[expect(
1012        clippy::new_without_default,
1013        reason = "explicit calls to new ensure correct backtrace bounds"
1014    )]
1015    pub fn new() -> HydroIrOpMetadata {
1016        Self::new_with_skip(1)
1017    }
1018
1019    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1020        HydroIrOpMetadata {
1021            backtrace: Backtrace::get_backtrace(2 + skip_count),
1022            cpu_usage: None,
1023            network_recv_cpu_usage: None,
1024            id: None,
1025        }
1026    }
1027}
1028
1029impl Debug for HydroIrOpMetadata {
1030    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031        f.debug_struct("HydroIrOpMetadata").finish()
1032    }
1033}
1034
1035impl Hash for HydroIrOpMetadata {
1036    fn hash<H: Hasher>(&self, _: &mut H) {}
1037}
1038
1039/// An intermediate node in a Hydro graph, which consumes data
1040/// from upstream nodes and emits data to downstream nodes.
1041#[derive(Debug, Hash)]
1042pub enum HydroNode {
1043    Placeholder,
1044
1045    Source {
1046        source: HydroSource,
1047        metadata: HydroIrMetadata,
1048    },
1049
1050    CycleSource {
1051        ident: syn::Ident,
1052        metadata: HydroIrMetadata,
1053    },
1054
1055    Tee {
1056        inner: TeeNode,
1057        metadata: HydroIrMetadata,
1058    },
1059
1060    Persist {
1061        inner: Box<HydroNode>,
1062        metadata: HydroIrMetadata,
1063    },
1064
1065    Unpersist {
1066        inner: Box<HydroNode>,
1067        metadata: HydroIrMetadata,
1068    },
1069
1070    Delta {
1071        inner: Box<HydroNode>,
1072        metadata: HydroIrMetadata,
1073    },
1074
1075    Chain {
1076        first: Box<HydroNode>,
1077        second: Box<HydroNode>,
1078        metadata: HydroIrMetadata,
1079    },
1080
1081    ChainFirst {
1082        first: Box<HydroNode>,
1083        second: Box<HydroNode>,
1084        metadata: HydroIrMetadata,
1085    },
1086
1087    CrossProduct {
1088        left: Box<HydroNode>,
1089        right: Box<HydroNode>,
1090        metadata: HydroIrMetadata,
1091    },
1092
1093    CrossSingleton {
1094        left: Box<HydroNode>,
1095        right: Box<HydroNode>,
1096        metadata: HydroIrMetadata,
1097    },
1098
1099    Join {
1100        left: Box<HydroNode>,
1101        right: Box<HydroNode>,
1102        metadata: HydroIrMetadata,
1103    },
1104
1105    Difference {
1106        pos: Box<HydroNode>,
1107        neg: Box<HydroNode>,
1108        metadata: HydroIrMetadata,
1109    },
1110
1111    AntiJoin {
1112        pos: Box<HydroNode>,
1113        neg: Box<HydroNode>,
1114        metadata: HydroIrMetadata,
1115    },
1116
1117    ResolveFutures {
1118        input: Box<HydroNode>,
1119        metadata: HydroIrMetadata,
1120    },
1121    ResolveFuturesOrdered {
1122        input: Box<HydroNode>,
1123        metadata: HydroIrMetadata,
1124    },
1125
1126    Map {
1127        f: DebugExpr,
1128        input: Box<HydroNode>,
1129        metadata: HydroIrMetadata,
1130    },
1131    FlatMap {
1132        f: DebugExpr,
1133        input: Box<HydroNode>,
1134        metadata: HydroIrMetadata,
1135    },
1136    Filter {
1137        f: DebugExpr,
1138        input: Box<HydroNode>,
1139        metadata: HydroIrMetadata,
1140    },
1141    FilterMap {
1142        f: DebugExpr,
1143        input: Box<HydroNode>,
1144        metadata: HydroIrMetadata,
1145    },
1146
1147    DeferTick {
1148        input: Box<HydroNode>,
1149        metadata: HydroIrMetadata,
1150    },
1151    Enumerate {
1152        is_static: bool,
1153        input: Box<HydroNode>,
1154        metadata: HydroIrMetadata,
1155    },
1156    Inspect {
1157        f: DebugExpr,
1158        input: Box<HydroNode>,
1159        metadata: HydroIrMetadata,
1160    },
1161
1162    Unique {
1163        input: Box<HydroNode>,
1164        metadata: HydroIrMetadata,
1165    },
1166
1167    Sort {
1168        input: Box<HydroNode>,
1169        metadata: HydroIrMetadata,
1170    },
1171    Fold {
1172        init: DebugExpr,
1173        acc: DebugExpr,
1174        input: Box<HydroNode>,
1175        metadata: HydroIrMetadata,
1176    },
1177
1178    Scan {
1179        init: DebugExpr,
1180        acc: DebugExpr,
1181        input: Box<HydroNode>,
1182        metadata: HydroIrMetadata,
1183    },
1184    FoldKeyed {
1185        init: DebugExpr,
1186        acc: DebugExpr,
1187        input: Box<HydroNode>,
1188        metadata: HydroIrMetadata,
1189    },
1190
1191    Reduce {
1192        f: DebugExpr,
1193        input: Box<HydroNode>,
1194        metadata: HydroIrMetadata,
1195    },
1196    ReduceKeyed {
1197        f: DebugExpr,
1198        input: Box<HydroNode>,
1199        metadata: HydroIrMetadata,
1200    },
1201    ReduceKeyedWatermark {
1202        f: DebugExpr,
1203        input: Box<HydroNode>,
1204        watermark: Box<HydroNode>,
1205        metadata: HydroIrMetadata,
1206    },
1207
1208    Network {
1209        serialize_fn: Option<DebugExpr>,
1210        instantiate_fn: DebugInstantiate,
1211        deserialize_fn: Option<DebugExpr>,
1212        input: Box<HydroNode>,
1213        metadata: HydroIrMetadata,
1214    },
1215
1216    ExternalInput {
1217        from_external_id: usize,
1218        from_key: usize,
1219        from_many: bool,
1220        codec_type: DebugType,
1221        port_hint: NetworkHint,
1222        instantiate_fn: DebugInstantiate,
1223        deserialize_fn: Option<DebugExpr>,
1224        metadata: HydroIrMetadata,
1225    },
1226
1227    Counter {
1228        tag: String,
1229        duration: DebugExpr,
1230        prefix: String,
1231        input: Box<HydroNode>,
1232        metadata: HydroIrMetadata,
1233    },
1234}
1235
1236pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1237pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1238
1239impl HydroNode {
1240    pub fn transform_bottom_up(
1241        &mut self,
1242        transform: &mut impl FnMut(&mut HydroNode),
1243        seen_tees: &mut SeenTees,
1244        check_well_formed: bool,
1245    ) {
1246        self.transform_children(
1247            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1248            seen_tees,
1249        );
1250
1251        transform(self);
1252
1253        let self_location = self.metadata().location_kind.root();
1254
1255        if check_well_formed {
1256            match &*self {
1257                HydroNode::Network { .. } => {}
1258                _ => {
1259                    self.input_metadata().iter().for_each(|i| {
1260                        if i.location_kind.root() != self_location {
1261                            panic!(
1262                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1263                                i,
1264                                i.location_kind.root(),
1265                                self,
1266                                self_location
1267                            )
1268                        }
1269                    });
1270                }
1271            }
1272        }
1273    }
1274
1275    #[inline(always)]
1276    pub fn transform_children(
1277        &mut self,
1278        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1279        seen_tees: &mut SeenTees,
1280    ) {
1281        match self {
1282            HydroNode::Placeholder => {
1283                panic!();
1284            }
1285
1286            HydroNode::Source { .. }
1287            | HydroNode::CycleSource { .. }
1288            | HydroNode::ExternalInput { .. } => {}
1289
1290            HydroNode::Tee { inner, .. } => {
1291                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1292                    *inner = TeeNode(transformed.clone());
1293                } else {
1294                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1295                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1296                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1297                    transform(&mut orig, seen_tees);
1298                    *transformed_cell.borrow_mut() = orig;
1299                    *inner = TeeNode(transformed_cell);
1300                }
1301            }
1302
1303            HydroNode::Persist { inner, .. }
1304            | HydroNode::Unpersist { inner, .. }
1305            | HydroNode::Delta { inner, .. } => {
1306                transform(inner.as_mut(), seen_tees);
1307            }
1308
1309            HydroNode::Chain { first, second, .. } => {
1310                transform(first.as_mut(), seen_tees);
1311                transform(second.as_mut(), seen_tees);
1312            }
1313
1314            HydroNode::ChainFirst { first, second, .. } => {
1315                transform(first.as_mut(), seen_tees);
1316                transform(second.as_mut(), seen_tees);
1317            }
1318
1319            HydroNode::CrossSingleton { left, right, .. }
1320            | HydroNode::CrossProduct { left, right, .. }
1321            | HydroNode::Join { left, right, .. } => {
1322                transform(left.as_mut(), seen_tees);
1323                transform(right.as_mut(), seen_tees);
1324            }
1325
1326            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1327                transform(pos.as_mut(), seen_tees);
1328                transform(neg.as_mut(), seen_tees);
1329            }
1330
1331            HydroNode::ReduceKeyedWatermark {
1332                input, watermark, ..
1333            } => {
1334                transform(input.as_mut(), seen_tees);
1335                transform(watermark.as_mut(), seen_tees);
1336            }
1337
1338            HydroNode::Map { input, .. }
1339            | HydroNode::ResolveFutures { input, .. }
1340            | HydroNode::ResolveFuturesOrdered { input, .. }
1341            | HydroNode::FlatMap { input, .. }
1342            | HydroNode::Filter { input, .. }
1343            | HydroNode::FilterMap { input, .. }
1344            | HydroNode::Sort { input, .. }
1345            | HydroNode::DeferTick { input, .. }
1346            | HydroNode::Enumerate { input, .. }
1347            | HydroNode::Inspect { input, .. }
1348            | HydroNode::Unique { input, .. }
1349            | HydroNode::Network { input, .. }
1350            | HydroNode::Fold { input, .. }
1351            | HydroNode::Scan { input, .. }
1352            | HydroNode::FoldKeyed { input, .. }
1353            | HydroNode::Reduce { input, .. }
1354            | HydroNode::ReduceKeyed { input, .. }
1355            | HydroNode::Counter { input, .. } => {
1356                transform(input.as_mut(), seen_tees);
1357            }
1358        }
1359    }
1360
1361    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1362        match self {
1363            HydroNode::Placeholder => HydroNode::Placeholder,
1364            HydroNode::Source { source, metadata } => HydroNode::Source {
1365                source: source.clone(),
1366                metadata: metadata.clone(),
1367            },
1368            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1369                ident: ident.clone(),
1370                metadata: metadata.clone(),
1371            },
1372            HydroNode::Tee { inner, metadata } => {
1373                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1374                    HydroNode::Tee {
1375                        inner: TeeNode(transformed.clone()),
1376                        metadata: metadata.clone(),
1377                    }
1378                } else {
1379                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1380                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1381                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1382                    *new_rc.borrow_mut() = cloned;
1383                    HydroNode::Tee {
1384                        inner: TeeNode(new_rc),
1385                        metadata: metadata.clone(),
1386                    }
1387                }
1388            }
1389            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1390                inner: Box::new(inner.deep_clone(seen_tees)),
1391                metadata: metadata.clone(),
1392            },
1393            HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
1394                inner: Box::new(inner.deep_clone(seen_tees)),
1395                metadata: metadata.clone(),
1396            },
1397            HydroNode::Delta { inner, metadata } => HydroNode::Delta {
1398                inner: Box::new(inner.deep_clone(seen_tees)),
1399                metadata: metadata.clone(),
1400            },
1401            HydroNode::Chain {
1402                first,
1403                second,
1404                metadata,
1405            } => HydroNode::Chain {
1406                first: Box::new(first.deep_clone(seen_tees)),
1407                second: Box::new(second.deep_clone(seen_tees)),
1408                metadata: metadata.clone(),
1409            },
1410            HydroNode::ChainFirst {
1411                first,
1412                second,
1413                metadata,
1414            } => HydroNode::ChainFirst {
1415                first: Box::new(first.deep_clone(seen_tees)),
1416                second: Box::new(second.deep_clone(seen_tees)),
1417                metadata: metadata.clone(),
1418            },
1419            HydroNode::CrossProduct {
1420                left,
1421                right,
1422                metadata,
1423            } => HydroNode::CrossProduct {
1424                left: Box::new(left.deep_clone(seen_tees)),
1425                right: Box::new(right.deep_clone(seen_tees)),
1426                metadata: metadata.clone(),
1427            },
1428            HydroNode::CrossSingleton {
1429                left,
1430                right,
1431                metadata,
1432            } => HydroNode::CrossSingleton {
1433                left: Box::new(left.deep_clone(seen_tees)),
1434                right: Box::new(right.deep_clone(seen_tees)),
1435                metadata: metadata.clone(),
1436            },
1437            HydroNode::Join {
1438                left,
1439                right,
1440                metadata,
1441            } => HydroNode::Join {
1442                left: Box::new(left.deep_clone(seen_tees)),
1443                right: Box::new(right.deep_clone(seen_tees)),
1444                metadata: metadata.clone(),
1445            },
1446            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1447                pos: Box::new(pos.deep_clone(seen_tees)),
1448                neg: Box::new(neg.deep_clone(seen_tees)),
1449                metadata: metadata.clone(),
1450            },
1451            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1452                pos: Box::new(pos.deep_clone(seen_tees)),
1453                neg: Box::new(neg.deep_clone(seen_tees)),
1454                metadata: metadata.clone(),
1455            },
1456            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1457                input: Box::new(input.deep_clone(seen_tees)),
1458                metadata: metadata.clone(),
1459            },
1460            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1461                HydroNode::ResolveFuturesOrdered {
1462                    input: Box::new(input.deep_clone(seen_tees)),
1463                    metadata: metadata.clone(),
1464                }
1465            }
1466            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1467                f: f.clone(),
1468                input: Box::new(input.deep_clone(seen_tees)),
1469                metadata: metadata.clone(),
1470            },
1471            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1472                f: f.clone(),
1473                input: Box::new(input.deep_clone(seen_tees)),
1474                metadata: metadata.clone(),
1475            },
1476            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1477                f: f.clone(),
1478                input: Box::new(input.deep_clone(seen_tees)),
1479                metadata: metadata.clone(),
1480            },
1481            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1482                f: f.clone(),
1483                input: Box::new(input.deep_clone(seen_tees)),
1484                metadata: metadata.clone(),
1485            },
1486            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1487                input: Box::new(input.deep_clone(seen_tees)),
1488                metadata: metadata.clone(),
1489            },
1490            HydroNode::Enumerate {
1491                is_static,
1492                input,
1493                metadata,
1494            } => HydroNode::Enumerate {
1495                is_static: *is_static,
1496                input: Box::new(input.deep_clone(seen_tees)),
1497                metadata: metadata.clone(),
1498            },
1499            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1500                f: f.clone(),
1501                input: Box::new(input.deep_clone(seen_tees)),
1502                metadata: metadata.clone(),
1503            },
1504            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1505                input: Box::new(input.deep_clone(seen_tees)),
1506                metadata: metadata.clone(),
1507            },
1508            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1509                input: Box::new(input.deep_clone(seen_tees)),
1510                metadata: metadata.clone(),
1511            },
1512            HydroNode::Fold {
1513                init,
1514                acc,
1515                input,
1516                metadata,
1517            } => HydroNode::Fold {
1518                init: init.clone(),
1519                acc: acc.clone(),
1520                input: Box::new(input.deep_clone(seen_tees)),
1521                metadata: metadata.clone(),
1522            },
1523            HydroNode::Scan {
1524                init,
1525                acc,
1526                input,
1527                metadata,
1528            } => HydroNode::Scan {
1529                init: init.clone(),
1530                acc: acc.clone(),
1531                input: Box::new(input.deep_clone(seen_tees)),
1532                metadata: metadata.clone(),
1533            },
1534            HydroNode::FoldKeyed {
1535                init,
1536                acc,
1537                input,
1538                metadata,
1539            } => HydroNode::FoldKeyed {
1540                init: init.clone(),
1541                acc: acc.clone(),
1542                input: Box::new(input.deep_clone(seen_tees)),
1543                metadata: metadata.clone(),
1544            },
1545            HydroNode::ReduceKeyedWatermark {
1546                f,
1547                input,
1548                watermark,
1549                metadata,
1550            } => HydroNode::ReduceKeyedWatermark {
1551                f: f.clone(),
1552                input: Box::new(input.deep_clone(seen_tees)),
1553                watermark: Box::new(watermark.deep_clone(seen_tees)),
1554                metadata: metadata.clone(),
1555            },
1556            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1557                f: f.clone(),
1558                input: Box::new(input.deep_clone(seen_tees)),
1559                metadata: metadata.clone(),
1560            },
1561            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1562                f: f.clone(),
1563                input: Box::new(input.deep_clone(seen_tees)),
1564                metadata: metadata.clone(),
1565            },
1566            HydroNode::Network {
1567                serialize_fn,
1568                instantiate_fn,
1569                deserialize_fn,
1570                input,
1571                metadata,
1572            } => HydroNode::Network {
1573                serialize_fn: serialize_fn.clone(),
1574                instantiate_fn: instantiate_fn.clone(),
1575                deserialize_fn: deserialize_fn.clone(),
1576                input: Box::new(input.deep_clone(seen_tees)),
1577                metadata: metadata.clone(),
1578            },
1579            HydroNode::ExternalInput {
1580                from_external_id,
1581                from_key,
1582                from_many,
1583                codec_type,
1584                port_hint,
1585                instantiate_fn,
1586                deserialize_fn,
1587                metadata,
1588            } => HydroNode::ExternalInput {
1589                from_external_id: *from_external_id,
1590                from_key: *from_key,
1591                from_many: *from_many,
1592                codec_type: codec_type.clone(),
1593                port_hint: *port_hint,
1594                instantiate_fn: instantiate_fn.clone(),
1595                deserialize_fn: deserialize_fn.clone(),
1596                metadata: metadata.clone(),
1597            },
1598            HydroNode::Counter {
1599                tag,
1600                duration,
1601                prefix,
1602                input,
1603                metadata,
1604            } => HydroNode::Counter {
1605                tag: tag.clone(),
1606                duration: duration.clone(),
1607                prefix: prefix.clone(),
1608                input: Box::new(input.deep_clone(seen_tees)),
1609                metadata: metadata.clone(),
1610            },
1611        }
1612    }
1613
1614    #[cfg(feature = "build")]
1615    pub fn emit_core(
1616        &mut self,
1617        builders_or_callback: &mut BuildersOrCallback<
1618            impl FnMut(&mut HydroRoot, &mut usize),
1619            impl FnMut(&mut HydroNode, &mut usize),
1620        >,
1621        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1622        next_stmt_id: &mut usize,
1623    ) -> (syn::Ident, usize) {
1624        match self {
1625            HydroNode::Placeholder => {
1626                panic!()
1627            }
1628
1629            HydroNode::Persist { inner, .. } => {
1630                let (inner_ident, location) =
1631                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1632
1633                let persist_ident =
1634                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1635
1636                match builders_or_callback {
1637                    BuildersOrCallback::Builders(graph_builders) => {
1638                        let builder = graph_builders.entry(location).or_default();
1639                        builder.add_dfir(
1640                            parse_quote! {
1641                                #persist_ident = #inner_ident -> persist::<'static>();
1642                            },
1643                            None,
1644                            Some(&next_stmt_id.to_string()),
1645                        );
1646                    }
1647                    BuildersOrCallback::Callback(_, node_callback) => {
1648                        node_callback(self, next_stmt_id);
1649                    }
1650                }
1651
1652                *next_stmt_id += 1;
1653
1654                (persist_ident, location)
1655            }
1656
1657            HydroNode::Unpersist { .. } => {
1658                panic!(
1659                    "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1660                )
1661            }
1662
1663            HydroNode::Delta { inner, .. } => {
1664                let (inner_ident, location) =
1665                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1666
1667                let delta_ident =
1668                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1669
1670                match builders_or_callback {
1671                    BuildersOrCallback::Builders(graph_builders) => {
1672                        let builder = graph_builders.entry(location).or_default();
1673                        builder.add_dfir(
1674                            parse_quote! {
1675                                #delta_ident = #inner_ident -> multiset_delta();
1676                            },
1677                            None,
1678                            Some(&next_stmt_id.to_string()),
1679                        );
1680                    }
1681                    BuildersOrCallback::Callback(_, node_callback) => {
1682                        node_callback(self, next_stmt_id);
1683                    }
1684                }
1685
1686                *next_stmt_id += 1;
1687
1688                (delta_ident, location)
1689            }
1690
1691            HydroNode::Source {
1692                source, metadata, ..
1693            } => {
1694                let location_id = metadata.location_kind.root().raw_id();
1695
1696                if let HydroSource::ExternalNetwork() = source {
1697                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1698                } else {
1699                    let source_ident =
1700                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1701
1702                    let source_stmt = match source {
1703                        HydroSource::Stream(expr) => {
1704                            parse_quote! {
1705                                #source_ident = source_stream(#expr);
1706                            }
1707                        }
1708
1709                        HydroSource::ExternalNetwork() => {
1710                            unreachable!()
1711                        }
1712
1713                        HydroSource::Iter(expr) => {
1714                            parse_quote! {
1715                                #source_ident = source_iter(#expr);
1716                            }
1717                        }
1718
1719                        HydroSource::Spin() => {
1720                            parse_quote! {
1721                                #source_ident = spin();
1722                            }
1723                        }
1724                    };
1725
1726                    match builders_or_callback {
1727                        BuildersOrCallback::Builders(graph_builders) => {
1728                            let builder = graph_builders.entry(location_id).or_default();
1729                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1730                        }
1731                        BuildersOrCallback::Callback(_, node_callback) => {
1732                            node_callback(self, next_stmt_id);
1733                        }
1734                    }
1735
1736                    *next_stmt_id += 1;
1737
1738                    (source_ident, location_id)
1739                }
1740            }
1741
1742            HydroNode::CycleSource {
1743                ident, metadata, ..
1744            } => {
1745                let location_id = metadata.location_kind.root().raw_id();
1746
1747                let ident = ident.clone();
1748
1749                match builders_or_callback {
1750                    BuildersOrCallback::Builders(_) => {}
1751                    BuildersOrCallback::Callback(_, node_callback) => {
1752                        node_callback(self, next_stmt_id);
1753                    }
1754                }
1755
1756                // consume a stmt id even though we did not emit anything so that we can instrument this
1757                *next_stmt_id += 1;
1758
1759                (ident, location_id)
1760            }
1761
1762            HydroNode::Tee { inner, .. } => {
1763                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1764                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1765                {
1766                    match builders_or_callback {
1767                        BuildersOrCallback::Builders(_) => {}
1768                        BuildersOrCallback::Callback(_, node_callback) => {
1769                            node_callback(self, next_stmt_id);
1770                        }
1771                    }
1772
1773                    (teed_from.clone(), *inner_location_id)
1774                } else {
1775                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1776                        builders_or_callback,
1777                        built_tees,
1778                        next_stmt_id,
1779                    );
1780
1781                    let tee_ident =
1782                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1783
1784                    built_tees.insert(
1785                        inner.0.as_ref() as *const RefCell<HydroNode>,
1786                        (tee_ident.clone(), inner_location_id),
1787                    );
1788
1789                    match builders_or_callback {
1790                        BuildersOrCallback::Builders(graph_builders) => {
1791                            let builder = graph_builders.entry(inner_location_id).or_default();
1792                            builder.add_dfir(
1793                                parse_quote! {
1794                                    #tee_ident = #inner_ident -> tee();
1795                                },
1796                                None,
1797                                Some(&next_stmt_id.to_string()),
1798                            );
1799                        }
1800                        BuildersOrCallback::Callback(_, node_callback) => {
1801                            node_callback(self, next_stmt_id);
1802                        }
1803                    }
1804
1805                    (tee_ident, inner_location_id)
1806                };
1807
1808                // we consume a stmt id regardless of if we emit the tee() operator,
1809                // so that during rewrites we touch all recipients of the tee()
1810
1811                *next_stmt_id += 1;
1812                (ret_ident, inner_location_id)
1813            }
1814
1815            HydroNode::Chain { first, second, .. } => {
1816                let (first_ident, first_location_id) =
1817                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1818                let (second_ident, second_location_id) =
1819                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1820
1821                let chain_ident =
1822                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1823
1824                match builders_or_callback {
1825                    BuildersOrCallback::Builders(graph_builders) => {
1826                        assert_eq!(
1827                            first_location_id, second_location_id,
1828                            "chain inputs must be in the same location"
1829                        );
1830                        let builder = graph_builders.entry(first_location_id).or_default();
1831                        builder.add_dfir(
1832                            parse_quote! {
1833                                #chain_ident = chain();
1834                                #first_ident -> [0]#chain_ident;
1835                                #second_ident -> [1]#chain_ident;
1836                            },
1837                            None,
1838                            Some(&next_stmt_id.to_string()),
1839                        );
1840                    }
1841                    BuildersOrCallback::Callback(_, node_callback) => {
1842                        node_callback(self, next_stmt_id);
1843                    }
1844                }
1845
1846                *next_stmt_id += 1;
1847
1848                (chain_ident, first_location_id)
1849            }
1850
1851            HydroNode::ChainFirst { first, second, .. } => {
1852                let (first_ident, first_location_id) =
1853                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1854                let (second_ident, second_location_id) =
1855                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1856
1857                let chain_ident =
1858                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1859
1860                match builders_or_callback {
1861                    BuildersOrCallback::Builders(graph_builders) => {
1862                        assert_eq!(
1863                            first_location_id, second_location_id,
1864                            "chain inputs must be in the same location"
1865                        );
1866                        let builder = graph_builders.entry(first_location_id).or_default();
1867                        builder.add_dfir(
1868                            parse_quote! {
1869                                #chain_ident = chain_first_n(1);
1870                                #first_ident -> [0]#chain_ident;
1871                                #second_ident -> [1]#chain_ident;
1872                            },
1873                            None,
1874                            Some(&next_stmt_id.to_string()),
1875                        );
1876                    }
1877                    BuildersOrCallback::Callback(_, node_callback) => {
1878                        node_callback(self, next_stmt_id);
1879                    }
1880                }
1881
1882                *next_stmt_id += 1;
1883
1884                (chain_ident, first_location_id)
1885            }
1886
1887            HydroNode::CrossSingleton { left, right, .. } => {
1888                let (left_ident, left_location_id) =
1889                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1890                let (right_ident, right_location_id) =
1891                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1892
1893                let cross_ident =
1894                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1895
1896                match builders_or_callback {
1897                    BuildersOrCallback::Builders(graph_builders) => {
1898                        assert_eq!(
1899                            left_location_id, right_location_id,
1900                            "cross_singleton inputs must be in the same location"
1901                        );
1902
1903                        let builder = graph_builders.entry(left_location_id).or_default();
1904                        builder.add_dfir(
1905                            parse_quote! {
1906                                #cross_ident = cross_singleton();
1907                                #left_ident -> [input]#cross_ident;
1908                                #right_ident -> [single]#cross_ident;
1909                            },
1910                            None,
1911                            Some(&next_stmt_id.to_string()),
1912                        );
1913                    }
1914                    BuildersOrCallback::Callback(_, node_callback) => {
1915                        node_callback(self, next_stmt_id);
1916                    }
1917                }
1918
1919                *next_stmt_id += 1;
1920
1921                (cross_ident, left_location_id)
1922            }
1923
1924            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1925                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1926                    parse_quote!(cross_join_multiset)
1927                } else {
1928                    parse_quote!(join_multiset)
1929                };
1930
1931                let (HydroNode::CrossProduct { left, right, .. }
1932                | HydroNode::Join { left, right, .. }) = self
1933                else {
1934                    unreachable!()
1935                };
1936
1937                let (left_inner, left_lifetime) =
1938                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1939                        (left, quote!('static))
1940                    } else {
1941                        (left, quote!('tick))
1942                    };
1943
1944                let (right_inner, right_lifetime) =
1945                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1946                        (right, quote!('static))
1947                    } else {
1948                        (right, quote!('tick))
1949                    };
1950
1951                let (left_ident, left_location_id) =
1952                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1953                let (right_ident, right_location_id) =
1954                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1955
1956                let stream_ident =
1957                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1958
1959                match builders_or_callback {
1960                    BuildersOrCallback::Builders(graph_builders) => {
1961                        assert_eq!(
1962                            left_location_id, right_location_id,
1963                            "join / cross product inputs must be in the same location"
1964                        );
1965
1966                        let builder = graph_builders.entry(left_location_id).or_default();
1967                        builder.add_dfir(
1968                            parse_quote! {
1969                                #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1970                                #left_ident -> [0]#stream_ident;
1971                                #right_ident -> [1]#stream_ident;
1972                            },
1973                            None,
1974                            Some(&next_stmt_id.to_string()),
1975                        );
1976                    }
1977                    BuildersOrCallback::Callback(_, node_callback) => {
1978                        node_callback(self, next_stmt_id);
1979                    }
1980                }
1981
1982                *next_stmt_id += 1;
1983
1984                (stream_ident, left_location_id)
1985            }
1986
1987            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1988                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1989                    parse_quote!(difference_multiset)
1990                } else {
1991                    parse_quote!(anti_join_multiset)
1992                };
1993
1994                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1995                    self
1996                else {
1997                    unreachable!()
1998                };
1999
2000                let (neg, neg_lifetime) =
2001                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2002                        (neg, quote!('static))
2003                    } else {
2004                        (neg, quote!('tick))
2005                    };
2006
2007                let (pos_ident, pos_location_id) =
2008                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2009                let (neg_ident, neg_location_id) =
2010                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2011
2012                let stream_ident =
2013                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2014
2015                match builders_or_callback {
2016                    BuildersOrCallback::Builders(graph_builders) => {
2017                        assert_eq!(
2018                            pos_location_id, neg_location_id,
2019                            "difference / anti join inputs must be in the same location"
2020                        );
2021
2022                        let builder = graph_builders.entry(pos_location_id).or_default();
2023                        builder.add_dfir(
2024                            parse_quote! {
2025                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2026                                #pos_ident -> [pos]#stream_ident;
2027                                #neg_ident -> [neg]#stream_ident;
2028                            },
2029                            None,
2030                            Some(&next_stmt_id.to_string()),
2031                        );
2032                    }
2033                    BuildersOrCallback::Callback(_, node_callback) => {
2034                        node_callback(self, next_stmt_id);
2035                    }
2036                }
2037
2038                *next_stmt_id += 1;
2039
2040                (stream_ident, pos_location_id)
2041            }
2042
2043            HydroNode::ResolveFutures { input, .. } => {
2044                let (input_ident, input_location_id) =
2045                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2046
2047                let futures_ident =
2048                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2049
2050                match builders_or_callback {
2051                    BuildersOrCallback::Builders(graph_builders) => {
2052                        let builder = graph_builders.entry(input_location_id).or_default();
2053                        builder.add_dfir(
2054                            parse_quote! {
2055                                #futures_ident = #input_ident -> resolve_futures();
2056                            },
2057                            None,
2058                            Some(&next_stmt_id.to_string()),
2059                        );
2060                    }
2061                    BuildersOrCallback::Callback(_, node_callback) => {
2062                        node_callback(self, next_stmt_id);
2063                    }
2064                }
2065
2066                *next_stmt_id += 1;
2067
2068                (futures_ident, input_location_id)
2069            }
2070
2071            HydroNode::ResolveFuturesOrdered { input, .. } => {
2072                let (input_ident, input_location_id) =
2073                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2074
2075                let futures_ident =
2076                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2077
2078                match builders_or_callback {
2079                    BuildersOrCallback::Builders(graph_builders) => {
2080                        let builder = graph_builders.entry(input_location_id).or_default();
2081                        builder.add_dfir(
2082                            parse_quote! {
2083                                #futures_ident = #input_ident -> resolve_futures_ordered();
2084                            },
2085                            None,
2086                            Some(&next_stmt_id.to_string()),
2087                        );
2088                    }
2089                    BuildersOrCallback::Callback(_, node_callback) => {
2090                        node_callback(self, next_stmt_id);
2091                    }
2092                }
2093
2094                *next_stmt_id += 1;
2095
2096                (futures_ident, input_location_id)
2097            }
2098
2099            HydroNode::Map { f, input, .. } => {
2100                let (input_ident, input_location_id) =
2101                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2102
2103                let map_ident =
2104                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2105
2106                match builders_or_callback {
2107                    BuildersOrCallback::Builders(graph_builders) => {
2108                        let builder = graph_builders.entry(input_location_id).or_default();
2109                        builder.add_dfir(
2110                            parse_quote! {
2111                                #map_ident = #input_ident -> map(#f);
2112                            },
2113                            None,
2114                            Some(&next_stmt_id.to_string()),
2115                        );
2116                    }
2117                    BuildersOrCallback::Callback(_, node_callback) => {
2118                        node_callback(self, next_stmt_id);
2119                    }
2120                }
2121
2122                *next_stmt_id += 1;
2123
2124                (map_ident, input_location_id)
2125            }
2126
2127            HydroNode::FlatMap { f, input, .. } => {
2128                let (input_ident, input_location_id) =
2129                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2130
2131                let flat_map_ident =
2132                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2133
2134                match builders_or_callback {
2135                    BuildersOrCallback::Builders(graph_builders) => {
2136                        let builder = graph_builders.entry(input_location_id).or_default();
2137                        builder.add_dfir(
2138                            parse_quote! {
2139                                #flat_map_ident = #input_ident -> flat_map(#f);
2140                            },
2141                            None,
2142                            Some(&next_stmt_id.to_string()),
2143                        );
2144                    }
2145                    BuildersOrCallback::Callback(_, node_callback) => {
2146                        node_callback(self, next_stmt_id);
2147                    }
2148                }
2149
2150                *next_stmt_id += 1;
2151
2152                (flat_map_ident, input_location_id)
2153            }
2154
2155            HydroNode::Filter { f, input, .. } => {
2156                let (input_ident, input_location_id) =
2157                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2158
2159                let filter_ident =
2160                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2161
2162                match builders_or_callback {
2163                    BuildersOrCallback::Builders(graph_builders) => {
2164                        let builder = graph_builders.entry(input_location_id).or_default();
2165                        builder.add_dfir(
2166                            parse_quote! {
2167                                #filter_ident = #input_ident -> filter(#f);
2168                            },
2169                            None,
2170                            Some(&next_stmt_id.to_string()),
2171                        );
2172                    }
2173                    BuildersOrCallback::Callback(_, node_callback) => {
2174                        node_callback(self, next_stmt_id);
2175                    }
2176                }
2177
2178                *next_stmt_id += 1;
2179
2180                (filter_ident, input_location_id)
2181            }
2182
2183            HydroNode::FilterMap { f, input, .. } => {
2184                let (input_ident, input_location_id) =
2185                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2186
2187                let filter_map_ident =
2188                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2189
2190                match builders_or_callback {
2191                    BuildersOrCallback::Builders(graph_builders) => {
2192                        let builder = graph_builders.entry(input_location_id).or_default();
2193                        builder.add_dfir(
2194                            parse_quote! {
2195                                #filter_map_ident = #input_ident -> filter_map(#f);
2196                            },
2197                            None,
2198                            Some(&next_stmt_id.to_string()),
2199                        );
2200                    }
2201                    BuildersOrCallback::Callback(_, node_callback) => {
2202                        node_callback(self, next_stmt_id);
2203                    }
2204                }
2205
2206                *next_stmt_id += 1;
2207
2208                (filter_map_ident, input_location_id)
2209            }
2210
2211            HydroNode::Sort { input, .. } => {
2212                let (input_ident, input_location_id) =
2213                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2214
2215                let sort_ident =
2216                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2217
2218                match builders_or_callback {
2219                    BuildersOrCallback::Builders(graph_builders) => {
2220                        let builder = graph_builders.entry(input_location_id).or_default();
2221                        builder.add_dfir(
2222                            parse_quote! {
2223                                #sort_ident = #input_ident -> sort();
2224                            },
2225                            None,
2226                            Some(&next_stmt_id.to_string()),
2227                        );
2228                    }
2229                    BuildersOrCallback::Callback(_, node_callback) => {
2230                        node_callback(self, next_stmt_id);
2231                    }
2232                }
2233
2234                *next_stmt_id += 1;
2235
2236                (sort_ident, input_location_id)
2237            }
2238
2239            HydroNode::DeferTick { input, .. } => {
2240                let (input_ident, input_location_id) =
2241                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2242
2243                let defer_tick_ident =
2244                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2245
2246                match builders_or_callback {
2247                    BuildersOrCallback::Builders(graph_builders) => {
2248                        let builder = graph_builders.entry(input_location_id).or_default();
2249                        builder.add_dfir(
2250                            parse_quote! {
2251                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2252                            },
2253                            None,
2254                            Some(&next_stmt_id.to_string()),
2255                        );
2256                    }
2257                    BuildersOrCallback::Callback(_, node_callback) => {
2258                        node_callback(self, next_stmt_id);
2259                    }
2260                }
2261
2262                *next_stmt_id += 1;
2263
2264                (defer_tick_ident, input_location_id)
2265            }
2266
2267            HydroNode::Enumerate {
2268                is_static, input, ..
2269            } => {
2270                let (input_ident, input_location_id) =
2271                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2272
2273                let enumerate_ident =
2274                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2275
2276                match builders_or_callback {
2277                    BuildersOrCallback::Builders(graph_builders) => {
2278                        let builder = graph_builders.entry(input_location_id).or_default();
2279                        let lifetime = if *is_static {
2280                            quote!('static)
2281                        } else {
2282                            quote!('tick)
2283                        };
2284                        builder.add_dfir(
2285                            parse_quote! {
2286                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2287                            },
2288                            None,
2289                            Some(&next_stmt_id.to_string()),
2290                        );
2291                    }
2292                    BuildersOrCallback::Callback(_, node_callback) => {
2293                        node_callback(self, next_stmt_id);
2294                    }
2295                }
2296
2297                *next_stmt_id += 1;
2298
2299                (enumerate_ident, input_location_id)
2300            }
2301
2302            HydroNode::Inspect { f, input, .. } => {
2303                let (input_ident, input_location_id) =
2304                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2305
2306                let inspect_ident =
2307                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2308
2309                match builders_or_callback {
2310                    BuildersOrCallback::Builders(graph_builders) => {
2311                        let builder = graph_builders.entry(input_location_id).or_default();
2312                        builder.add_dfir(
2313                            parse_quote! {
2314                                #inspect_ident = #input_ident -> inspect(#f);
2315                            },
2316                            None,
2317                            Some(&next_stmt_id.to_string()),
2318                        );
2319                    }
2320                    BuildersOrCallback::Callback(_, node_callback) => {
2321                        node_callback(self, next_stmt_id);
2322                    }
2323                }
2324
2325                *next_stmt_id += 1;
2326
2327                (inspect_ident, input_location_id)
2328            }
2329
2330            HydroNode::Unique { input, .. } => {
2331                let (input_ident, input_location_id) =
2332                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2333
2334                let unique_ident =
2335                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2336
2337                match builders_or_callback {
2338                    BuildersOrCallback::Builders(graph_builders) => {
2339                        let builder = graph_builders.entry(input_location_id).or_default();
2340                        builder.add_dfir(
2341                            parse_quote! {
2342                                #unique_ident = #input_ident -> unique::<'tick>();
2343                            },
2344                            None,
2345                            Some(&next_stmt_id.to_string()),
2346                        );
2347                    }
2348                    BuildersOrCallback::Callback(_, node_callback) => {
2349                        node_callback(self, next_stmt_id);
2350                    }
2351                }
2352
2353                *next_stmt_id += 1;
2354
2355                (unique_ident, input_location_id)
2356            }
2357
2358            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2359                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2360                    parse_quote!(fold)
2361                } else if matches!(self, HydroNode::Scan { .. }) {
2362                    parse_quote!(scan)
2363                } else {
2364                    parse_quote!(fold_keyed)
2365                };
2366
2367                let (HydroNode::Fold {
2368                    init, acc, input, ..
2369                }
2370                | HydroNode::FoldKeyed {
2371                    init, acc, input, ..
2372                }
2373                | HydroNode::Scan {
2374                    init, acc, input, ..
2375                }) = self
2376                else {
2377                    unreachable!()
2378                };
2379
2380                let (input, lifetime) =
2381                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2382                        (input, quote!('static))
2383                    } else {
2384                        (input, quote!('tick))
2385                    };
2386
2387                let (input_ident, input_location_id) =
2388                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2389
2390                let fold_ident =
2391                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2392
2393                match builders_or_callback {
2394                    BuildersOrCallback::Builders(graph_builders) => {
2395                        let builder = graph_builders.entry(input_location_id).or_default();
2396                        builder.add_dfir(
2397                            parse_quote! {
2398                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2399                            },
2400                            None,
2401                            Some(&next_stmt_id.to_string()),
2402                        );
2403                    }
2404                    BuildersOrCallback::Callback(_, node_callback) => {
2405                        node_callback(self, next_stmt_id);
2406                    }
2407                }
2408
2409                *next_stmt_id += 1;
2410
2411                (fold_ident, input_location_id)
2412            }
2413
2414            HydroNode::ReduceKeyedWatermark {
2415                f,
2416                input,
2417                watermark,
2418                ..
2419            } => {
2420                let (input, lifetime) =
2421                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2422                        (input, quote!('static))
2423                    } else {
2424                        (input, quote!('tick))
2425                    };
2426
2427                let (input_ident, input_location_id) =
2428                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2429
2430                let (watermark_ident, watermark_location_id) =
2431                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2432
2433                let chain_ident = syn::Ident::new(
2434                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2435                    Span::call_site(),
2436                );
2437
2438                let fold_ident =
2439                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2440
2441                match builders_or_callback {
2442                    BuildersOrCallback::Builders(graph_builders) => {
2443                        assert_eq!(
2444                            input_location_id, watermark_location_id,
2445                            "ReduceKeyedWatermark inputs must be in the same location"
2446                        );
2447
2448                        let builder = graph_builders.entry(input_location_id).or_default();
2449                        // 1. Don't allow any values to be added to the map if the key <=the watermark
2450                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
2451                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
2452                        // 3. Convert the BTreeMap back into a stream of (k, v)
2453                        builder.add_dfir(
2454                            parse_quote! {
2455                                #chain_ident = chain();
2456                                #input_ident
2457                                    -> map(|x| (Some(x), None))
2458                                    -> [0]#chain_ident;
2459                                #watermark_ident
2460                                    -> map(|watermark| (None, Some(watermark)))
2461                                    -> [1]#chain_ident;
2462
2463                                #fold_ident = #chain_ident
2464                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2465                                        let __reduce_keyed_fn = #f;
2466                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2467                                            if let Some((k, v)) = opt_payload {
2468                                                if let Some(curr_watermark) = *opt_curr_watermark {
2469                                                    if k <= curr_watermark {
2470                                                        return;
2471                                                    }
2472                                                }
2473                                                match map.entry(k) {
2474                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
2475                                                        e.insert(v);
2476                                                    }
2477                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
2478                                                        __reduce_keyed_fn(e.get_mut(), v);
2479                                                    }
2480                                                }
2481                                            } else {
2482                                                let watermark = opt_watermark.unwrap();
2483                                                if let Some(curr_watermark) = *opt_curr_watermark {
2484                                                    if watermark <= curr_watermark {
2485                                                        return;
2486                                                    }
2487                                                }
2488                                                *opt_curr_watermark = opt_watermark;
2489                                                map.retain(|k, _| *k > watermark);
2490                                            }
2491                                        }
2492                                    })
2493                                    -> flat_map(|(map, _curr_watermark)| map);
2494                            },
2495                            None,
2496                            Some(&next_stmt_id.to_string()),
2497                        );
2498                    }
2499                    BuildersOrCallback::Callback(_, node_callback) => {
2500                        node_callback(self, next_stmt_id);
2501                    }
2502                }
2503
2504                *next_stmt_id += 1;
2505
2506                (fold_ident, input_location_id)
2507            }
2508
2509            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2510                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2511                    parse_quote!(reduce)
2512                } else {
2513                    parse_quote!(reduce_keyed)
2514                };
2515
2516                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2517                    self
2518                else {
2519                    unreachable!()
2520                };
2521
2522                let (input, lifetime) =
2523                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2524                        (input, quote!('static))
2525                    } else {
2526                        (input, quote!('tick))
2527                    };
2528
2529                let (input_ident, input_location_id) =
2530                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2531
2532                let reduce_ident =
2533                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535                match builders_or_callback {
2536                    BuildersOrCallback::Builders(graph_builders) => {
2537                        let builder = graph_builders.entry(input_location_id).or_default();
2538                        builder.add_dfir(
2539                            parse_quote! {
2540                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2541                            },
2542                            None,
2543                            Some(&next_stmt_id.to_string()),
2544                        );
2545                    }
2546                    BuildersOrCallback::Callback(_, node_callback) => {
2547                        node_callback(self, next_stmt_id);
2548                    }
2549                }
2550
2551                *next_stmt_id += 1;
2552
2553                (reduce_ident, input_location_id)
2554            }
2555
2556            HydroNode::Network {
2557                serialize_fn: serialize_pipeline,
2558                instantiate_fn,
2559                deserialize_fn: deserialize_pipeline,
2560                input,
2561                metadata,
2562                ..
2563            } => {
2564                let (input_ident, input_location_id) =
2565                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2566
2567                let to_id = metadata.location_kind.root().raw_id();
2568
2569                let receiver_stream_ident =
2570                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2571
2572                match builders_or_callback {
2573                    BuildersOrCallback::Builders(graph_builders) => {
2574                        let (sink_expr, source_expr) = match instantiate_fn {
2575                            DebugInstantiate::Building => (
2576                                syn::parse_quote!(DUMMY_SINK),
2577                                syn::parse_quote!(DUMMY_SOURCE),
2578                            ),
2579
2580                            DebugInstantiate::Finalized(finalized) => {
2581                                (finalized.sink.clone(), finalized.source.clone())
2582                            }
2583                        };
2584
2585                        let sender_builder = graph_builders.entry(input_location_id).or_default();
2586                        if let Some(serialize_pipeline) = serialize_pipeline {
2587                            sender_builder.add_dfir(
2588                                parse_quote! {
2589                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
2590                                },
2591                                None,
2592                                // operator tag separates send and receive, which otherwise have the same next_stmt_id
2593                                Some(&format!("send{}", next_stmt_id)),
2594                            );
2595                        } else {
2596                            sender_builder.add_dfir(
2597                                parse_quote! {
2598                                    #input_ident -> dest_sink(#sink_expr);
2599                                },
2600                                None,
2601                                Some(&format!("send{}", next_stmt_id)),
2602                            );
2603                        }
2604
2605                        let receiver_builder = graph_builders.entry(to_id).or_default();
2606                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2607                            receiver_builder.add_dfir(parse_quote! {
2608                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2609                            }, None, Some(&format!("recv{}", next_stmt_id)));
2610                        } else {
2611                            receiver_builder.add_dfir(
2612                                parse_quote! {
2613                                    #receiver_stream_ident = source_stream(#source_expr);
2614                                },
2615                                None,
2616                                Some(&format!("recv{}", next_stmt_id)),
2617                            );
2618                        }
2619                    }
2620                    BuildersOrCallback::Callback(_, node_callback) => {
2621                        node_callback(self, next_stmt_id);
2622                    }
2623                }
2624
2625                *next_stmt_id += 1;
2626
2627                (receiver_stream_ident, to_id)
2628            }
2629
2630            HydroNode::ExternalInput {
2631                instantiate_fn,
2632                deserialize_fn: deserialize_pipeline,
2633                metadata,
2634                ..
2635            } => {
2636                let to_id = metadata.location_kind.root().raw_id();
2637
2638                let receiver_stream_ident =
2639                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2640
2641                match builders_or_callback {
2642                    BuildersOrCallback::Builders(graph_builders) => {
2643                        let (_, source_expr) = match instantiate_fn {
2644                            DebugInstantiate::Building => (
2645                                syn::parse_quote!(DUMMY_SINK),
2646                                syn::parse_quote!(DUMMY_SOURCE),
2647                            ),
2648
2649                            DebugInstantiate::Finalized(finalized) => {
2650                                (finalized.sink.clone(), finalized.source.clone())
2651                            }
2652                        };
2653
2654                        let receiver_builder = graph_builders.entry(to_id).or_default();
2655                        if let Some(deserialize_pipeline) = deserialize_pipeline {
2656                            receiver_builder.add_dfir(parse_quote! {
2657                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
2658                            }, None, Some(&format!("recv{}", next_stmt_id)));
2659                        } else {
2660                            receiver_builder.add_dfir(
2661                                parse_quote! {
2662                                    #receiver_stream_ident = source_stream(#source_expr);
2663                                },
2664                                None,
2665                                Some(&format!("recv{}", next_stmt_id)),
2666                            );
2667                        }
2668                    }
2669                    BuildersOrCallback::Callback(_, node_callback) => {
2670                        node_callback(self, next_stmt_id);
2671                    }
2672                }
2673
2674                *next_stmt_id += 1;
2675
2676                (receiver_stream_ident, to_id)
2677            }
2678
2679            HydroNode::Counter {
2680                tag,
2681                duration,
2682                prefix,
2683                input,
2684                ..
2685            } => {
2686                let (input_ident, input_location_id) =
2687                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2688
2689                let counter_ident =
2690                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2691
2692                match builders_or_callback {
2693                    BuildersOrCallback::Builders(graph_builders) => {
2694                        let builder = graph_builders.entry(input_location_id).or_default();
2695                        builder.add_dfir(
2696                            parse_quote! {
2697                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
2698                            },
2699                            None,
2700                            Some(&next_stmt_id.to_string()),
2701                        );
2702                    }
2703                    BuildersOrCallback::Callback(_, node_callback) => {
2704                        node_callback(self, next_stmt_id);
2705                    }
2706                }
2707
2708                *next_stmt_id += 1;
2709
2710                (counter_ident, input_location_id)
2711            }
2712        }
2713    }
2714
2715    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2716        match self {
2717            HydroNode::Placeholder => {
2718                panic!()
2719            }
2720            HydroNode::Source { source, .. } => match source {
2721                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2722                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2723            },
2724            HydroNode::CycleSource { .. }
2725            | HydroNode::Tee { .. }
2726            | HydroNode::Persist { .. }
2727            | HydroNode::Unpersist { .. }
2728            | HydroNode::Delta { .. }
2729            | HydroNode::Chain { .. }
2730            | HydroNode::ChainFirst { .. }
2731            | HydroNode::CrossProduct { .. }
2732            | HydroNode::CrossSingleton { .. }
2733            | HydroNode::ResolveFutures { .. }
2734            | HydroNode::ResolveFuturesOrdered { .. }
2735            | HydroNode::Join { .. }
2736            | HydroNode::Difference { .. }
2737            | HydroNode::AntiJoin { .. }
2738            | HydroNode::DeferTick { .. }
2739            | HydroNode::Enumerate { .. }
2740            | HydroNode::Unique { .. }
2741            | HydroNode::Sort { .. } => {}
2742            HydroNode::Map { f, .. }
2743            | HydroNode::FlatMap { f, .. }
2744            | HydroNode::Filter { f, .. }
2745            | HydroNode::FilterMap { f, .. }
2746            | HydroNode::Inspect { f, .. }
2747            | HydroNode::Reduce { f, .. }
2748            | HydroNode::ReduceKeyed { f, .. }
2749            | HydroNode::ReduceKeyedWatermark { f, .. } => {
2750                transform(f);
2751            }
2752            HydroNode::Fold { init, acc, .. }
2753            | HydroNode::Scan { init, acc, .. }
2754            | HydroNode::FoldKeyed { init, acc, .. } => {
2755                transform(init);
2756                transform(acc);
2757            }
2758            HydroNode::Network {
2759                serialize_fn,
2760                deserialize_fn,
2761                ..
2762            } => {
2763                if let Some(serialize_fn) = serialize_fn {
2764                    transform(serialize_fn);
2765                }
2766                if let Some(deserialize_fn) = deserialize_fn {
2767                    transform(deserialize_fn);
2768                }
2769            }
2770            HydroNode::ExternalInput { deserialize_fn, .. } => {
2771                if let Some(deserialize_fn) = deserialize_fn {
2772                    transform(deserialize_fn);
2773                }
2774            }
2775            HydroNode::Counter { duration, .. } => {
2776                transform(duration);
2777            }
2778        }
2779    }
2780
2781    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
2782        &self.metadata().op
2783    }
2784
2785    pub fn metadata(&self) -> &HydroIrMetadata {
2786        match self {
2787            HydroNode::Placeholder => {
2788                panic!()
2789            }
2790            HydroNode::Source { metadata, .. } => metadata,
2791            HydroNode::CycleSource { metadata, .. } => metadata,
2792            HydroNode::Tee { metadata, .. } => metadata,
2793            HydroNode::Persist { metadata, .. } => metadata,
2794            HydroNode::Unpersist { metadata, .. } => metadata,
2795            HydroNode::Delta { metadata, .. } => metadata,
2796            HydroNode::Chain { metadata, .. } => metadata,
2797            HydroNode::ChainFirst { metadata, .. } => metadata,
2798            HydroNode::CrossProduct { metadata, .. } => metadata,
2799            HydroNode::CrossSingleton { metadata, .. } => metadata,
2800            HydroNode::Join { metadata, .. } => metadata,
2801            HydroNode::Difference { metadata, .. } => metadata,
2802            HydroNode::AntiJoin { metadata, .. } => metadata,
2803            HydroNode::ResolveFutures { metadata, .. } => metadata,
2804            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2805            HydroNode::Map { metadata, .. } => metadata,
2806            HydroNode::FlatMap { metadata, .. } => metadata,
2807            HydroNode::Filter { metadata, .. } => metadata,
2808            HydroNode::FilterMap { metadata, .. } => metadata,
2809            HydroNode::DeferTick { metadata, .. } => metadata,
2810            HydroNode::Enumerate { metadata, .. } => metadata,
2811            HydroNode::Inspect { metadata, .. } => metadata,
2812            HydroNode::Unique { metadata, .. } => metadata,
2813            HydroNode::Sort { metadata, .. } => metadata,
2814            HydroNode::Scan { metadata, .. } => metadata,
2815            HydroNode::Fold { metadata, .. } => metadata,
2816            HydroNode::FoldKeyed { metadata, .. } => metadata,
2817            HydroNode::Reduce { metadata, .. } => metadata,
2818            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2819            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2820            HydroNode::ExternalInput { metadata, .. } => metadata,
2821            HydroNode::Network { metadata, .. } => metadata,
2822            HydroNode::Counter { metadata, .. } => metadata,
2823        }
2824    }
2825
2826    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
2827        &mut self.metadata_mut().op
2828    }
2829
2830    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2831        match self {
2832            HydroNode::Placeholder => {
2833                panic!()
2834            }
2835            HydroNode::Source { metadata, .. } => metadata,
2836            HydroNode::CycleSource { metadata, .. } => metadata,
2837            HydroNode::Tee { metadata, .. } => metadata,
2838            HydroNode::Persist { metadata, .. } => metadata,
2839            HydroNode::Unpersist { metadata, .. } => metadata,
2840            HydroNode::Delta { metadata, .. } => metadata,
2841            HydroNode::Chain { metadata, .. } => metadata,
2842            HydroNode::ChainFirst { metadata, .. } => metadata,
2843            HydroNode::CrossProduct { metadata, .. } => metadata,
2844            HydroNode::CrossSingleton { metadata, .. } => metadata,
2845            HydroNode::Join { metadata, .. } => metadata,
2846            HydroNode::Difference { metadata, .. } => metadata,
2847            HydroNode::AntiJoin { metadata, .. } => metadata,
2848            HydroNode::ResolveFutures { metadata, .. } => metadata,
2849            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2850            HydroNode::Map { metadata, .. } => metadata,
2851            HydroNode::FlatMap { metadata, .. } => metadata,
2852            HydroNode::Filter { metadata, .. } => metadata,
2853            HydroNode::FilterMap { metadata, .. } => metadata,
2854            HydroNode::DeferTick { metadata, .. } => metadata,
2855            HydroNode::Enumerate { metadata, .. } => metadata,
2856            HydroNode::Inspect { metadata, .. } => metadata,
2857            HydroNode::Unique { metadata, .. } => metadata,
2858            HydroNode::Sort { metadata, .. } => metadata,
2859            HydroNode::Scan { metadata, .. } => metadata,
2860            HydroNode::Fold { metadata, .. } => metadata,
2861            HydroNode::FoldKeyed { metadata, .. } => metadata,
2862            HydroNode::Reduce { metadata, .. } => metadata,
2863            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2864            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
2865            HydroNode::ExternalInput { metadata, .. } => metadata,
2866            HydroNode::Network { metadata, .. } => metadata,
2867            HydroNode::Counter { metadata, .. } => metadata,
2868        }
2869    }
2870
2871    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
2872        match self {
2873            HydroNode::Placeholder => {
2874                panic!()
2875            }
2876            HydroNode::Source { .. }
2877            | HydroNode::ExternalInput { .. }
2878            | HydroNode::CycleSource { .. } // CycleSource and Tee should calculate input metadata in separate special ways
2879            | HydroNode::Tee { .. } => {
2880                vec![]
2881            }
2882            HydroNode::Persist { inner, .. }
2883            | HydroNode::Unpersist { inner, .. }
2884            | HydroNode::Delta { inner, .. } => {
2885                vec![inner.metadata()]
2886            }
2887            HydroNode::Chain { first, second, .. } => {
2888                vec![first.metadata(), second.metadata()]
2889            }
2890            HydroNode::ChainFirst { first, second, .. } => {
2891                vec![first.metadata(), second.metadata()]
2892            }
2893            HydroNode::CrossProduct { left, right, .. }
2894            | HydroNode::CrossSingleton { left, right, .. }
2895            | HydroNode::Join { left, right, .. } => {
2896                vec![left.metadata(), right.metadata()]
2897            }
2898            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2899                vec![pos.metadata(), neg.metadata()]
2900            }
2901            HydroNode::Map { input, .. }
2902            | HydroNode::FlatMap { input, .. }
2903            | HydroNode::Filter { input, .. }
2904            | HydroNode::FilterMap { input, .. }
2905            | HydroNode::Sort { input, .. }
2906            | HydroNode::DeferTick { input, .. }
2907            | HydroNode::Enumerate { input, .. }
2908            | HydroNode::Inspect { input, .. }
2909            | HydroNode::Unique { input, .. }
2910            | HydroNode::Network { input, .. }
2911            | HydroNode::Counter { input, .. }
2912            | HydroNode::ResolveFutures { input, .. }
2913            | HydroNode::ResolveFuturesOrdered { input, .. } => {
2914                vec![input.metadata()]
2915            }
2916            HydroNode::Fold { input, .. }
2917            | HydroNode::FoldKeyed { input, .. }
2918            | HydroNode::Reduce { input, .. }
2919            | HydroNode::ReduceKeyed { input, .. }
2920            | HydroNode::Scan { input, .. } => {
2921                // Skip persist before fold/reduce
2922                if let HydroNode::Persist { inner, .. } = input.as_ref() {
2923                    vec![inner.metadata()]
2924                } else {
2925                    vec![input.metadata()]
2926                }
2927            }
2928            HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
2929                // Skip persist before fold/reduce
2930                if let HydroNode::Persist { inner, .. } = input.as_ref() {
2931                    vec![inner.metadata(), watermark.metadata()]
2932                } else {
2933                    vec![input.metadata(), watermark.metadata()]
2934                }
2935            }
2936        }
2937    }
2938
2939    pub fn print_root(&self) -> String {
2940        match self {
2941            HydroNode::Placeholder => {
2942                panic!()
2943            }
2944            HydroNode::Source { source, .. } => format!("Source({:?})", source),
2945            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2946            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2947            HydroNode::Persist { .. } => "Persist()".to_string(),
2948            HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2949            HydroNode::Delta { .. } => "Delta()".to_string(),
2950            HydroNode::Chain { first, second, .. } => {
2951                format!("Chain({}, {})", first.print_root(), second.print_root())
2952            }
2953            HydroNode::ChainFirst { first, second, .. } => {
2954                format!(
2955                    "ChainFirst({}, {})",
2956                    first.print_root(),
2957                    second.print_root()
2958                )
2959            }
2960            HydroNode::CrossProduct { left, right, .. } => {
2961                format!(
2962                    "CrossProduct({}, {})",
2963                    left.print_root(),
2964                    right.print_root()
2965                )
2966            }
2967            HydroNode::CrossSingleton { left, right, .. } => {
2968                format!(
2969                    "CrossSingleton({}, {})",
2970                    left.print_root(),
2971                    right.print_root()
2972                )
2973            }
2974            HydroNode::Join { left, right, .. } => {
2975                format!("Join({}, {})", left.print_root(), right.print_root())
2976            }
2977            HydroNode::Difference { pos, neg, .. } => {
2978                format!("Difference({}, {})", pos.print_root(), neg.print_root())
2979            }
2980            HydroNode::AntiJoin { pos, neg, .. } => {
2981                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2982            }
2983            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2984            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2985            HydroNode::Map { f, .. } => format!("Map({:?})", f),
2986            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2987            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2988            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2989            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2990            HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2991            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2992            HydroNode::Unique { .. } => "Unique()".to_string(),
2993            HydroNode::Sort { .. } => "Sort()".to_string(),
2994            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2995            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
2996            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2997            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2998            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2999            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3000            HydroNode::Network { .. } => "Network()".to_string(),
3001            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3002            HydroNode::Counter { tag, duration, .. } => {
3003                format!("Counter({:?}, {:?})", tag, duration)
3004            }
3005        }
3006    }
3007}
3008
3009#[cfg(feature = "build")]
3010fn instantiate_network<'a, D>(
3011    from_location: &LocationId,
3012    to_location: &LocationId,
3013    processes: &HashMap<usize, D::Process>,
3014    clusters: &HashMap<usize, D::Cluster>,
3015    compile_env: &D::CompileEnv,
3016) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3017where
3018    D: Deploy<'a>,
3019{
3020    let ((sink, source), connect_fn) = match (from_location, to_location) {
3021        (LocationId::Process(from), LocationId::Process(to)) => {
3022            let from_node = processes
3023                .get(from)
3024                .unwrap_or_else(|| {
3025                    panic!("A process used in the graph was not instantiated: {}", from)
3026                })
3027                .clone();
3028            let to_node = processes
3029                .get(to)
3030                .unwrap_or_else(|| {
3031                    panic!("A process used in the graph was not instantiated: {}", to)
3032                })
3033                .clone();
3034
3035            let sink_port = D::allocate_process_port(&from_node);
3036            let source_port = D::allocate_process_port(&to_node);
3037
3038            (
3039                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3040                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3041            )
3042        }
3043        (LocationId::Process(from), LocationId::Cluster(to)) => {
3044            let from_node = processes
3045                .get(from)
3046                .unwrap_or_else(|| {
3047                    panic!("A process used in the graph was not instantiated: {}", from)
3048                })
3049                .clone();
3050            let to_node = clusters
3051                .get(to)
3052                .unwrap_or_else(|| {
3053                    panic!("A cluster used in the graph was not instantiated: {}", to)
3054                })
3055                .clone();
3056
3057            let sink_port = D::allocate_process_port(&from_node);
3058            let source_port = D::allocate_cluster_port(&to_node);
3059
3060            (
3061                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3062                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3063            )
3064        }
3065        (LocationId::Cluster(from), LocationId::Process(to)) => {
3066            let from_node = clusters
3067                .get(from)
3068                .unwrap_or_else(|| {
3069                    panic!("A cluster used in the graph was not instantiated: {}", from)
3070                })
3071                .clone();
3072            let to_node = processes
3073                .get(to)
3074                .unwrap_or_else(|| {
3075                    panic!("A process used in the graph was not instantiated: {}", to)
3076                })
3077                .clone();
3078
3079            let sink_port = D::allocate_cluster_port(&from_node);
3080            let source_port = D::allocate_process_port(&to_node);
3081
3082            (
3083                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3084                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3085            )
3086        }
3087        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3088            let from_node = clusters
3089                .get(from)
3090                .unwrap_or_else(|| {
3091                    panic!("A cluster used in the graph was not instantiated: {}", from)
3092                })
3093                .clone();
3094            let to_node = clusters
3095                .get(to)
3096                .unwrap_or_else(|| {
3097                    panic!("A cluster used in the graph was not instantiated: {}", to)
3098                })
3099                .clone();
3100
3101            let sink_port = D::allocate_cluster_port(&from_node);
3102            let source_port = D::allocate_cluster_port(&to_node);
3103
3104            (
3105                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3106                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3107            )
3108        }
3109        (LocationId::Tick(_, _), _) => panic!(),
3110        (_, LocationId::Tick(_, _)) => panic!(),
3111    };
3112    (sink, source, connect_fn)
3113}
3114
3115#[cfg(test)]
3116mod test {
3117    use std::mem::size_of;
3118
3119    use stageleft::{QuotedWithContext, q};
3120
3121    use super::*;
3122
3123    #[test]
3124    fn hydro_node_size() {
3125        assert_eq!(size_of::<HydroNode>(), 248);
3126    }
3127
3128    #[test]
3129    fn hydro_root_size() {
3130        assert_eq!(size_of::<HydroRoot>(), 176);
3131    }
3132
3133    #[test]
3134    fn test_simplify_q_macro_basic() {
3135        // Test basic non-q! expression
3136        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3137        let result = simplify_q_macro(simple_expr.clone());
3138        assert_eq!(result, simple_expr);
3139    }
3140
3141    #[test]
3142    fn test_simplify_q_macro_actual_stageleft_call() {
3143        // Test a simplified version of what a real stageleft call might look like
3144        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3145        let result = simplify_q_macro(stageleft_call);
3146        // This should be processed by our visitor and simplified to q!(...)
3147        // since we detect the stageleft::runtime_support::fn_* pattern
3148        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3149    }
3150
3151    #[test]
3152    fn test_closure_no_pipe_at_start() {
3153        // Test a closure that does not start with a pipe
3154        let stageleft_call = q!({
3155            let foo = 123;
3156            move |b: usize| b + foo
3157        })
3158        .splice_fn1_ctx(&());
3159        let result = simplify_q_macro(stageleft_call);
3160        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3161    }
3162}