Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33/// Wrapper that displays only the tokens of a parsed expr.
34///
35/// Boxes `syn::Type` which is ~240 bytes.
36#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40    fn from(expr: syn::Expr) -> Self {
41        Self(Box::new(expr))
42    }
43}
44
45impl Deref for DebugExpr {
46    type Target = syn::Expr;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl ToTokens for DebugExpr {
54    fn to_tokens(&self, tokens: &mut TokenStream) {
55        self.0.to_tokens(tokens);
56    }
57}
58
59impl Debug for DebugExpr {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0.to_token_stream())
62    }
63}
64
65impl Display for DebugExpr {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let original = self.0.as_ref().clone();
68        let simplified = simplify_q_macro(original);
69
70        // For now, just use quote formatting without trying to parse as a statement
71        // This avoids the syn::parse_quote! issues entirely
72        write!(f, "q!({})", quote::quote!(#simplified))
73    }
74}
75
76/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
77fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78    // Try to parse the token string as a syn::Expr
79    // Use a visitor to simplify q! macro expansions
80    let mut simplifier = QMacroSimplifier::new();
81    simplifier.visit_expr_mut(&mut expr);
82
83    // If we found and simplified a q! macro, return the simplified version
84    if let Some(simplified) = simplifier.simplified_result {
85        simplified
86    } else {
87        expr
88    }
89}
90
91/// AST visitor that simplifies q! macro expansions
92#[derive(Default)]
93pub struct QMacroSimplifier {
94    pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103impl VisitMut for QMacroSimplifier {
104    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105        // Check if we already found a result to avoid further processing
106        if self.simplified_result.is_some() {
107            return;
108        }
109
110        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111            // Look for calls to stageleft::runtime_support::fn*
112            && self.is_stageleft_runtime_support_call(&path_expr.path)
113            // Try to extract the closure from the arguments
114            && let Some(closure) = self.extract_closure_from_args(&call.args)
115        {
116            self.simplified_result = Some(closure);
117            return;
118        }
119
120        // Continue visiting child expressions using the default implementation
121        // Use the default visitor to avoid infinite recursion
122        syn::visit_mut::visit_expr_mut(self, expr);
123    }
124}
125
126impl QMacroSimplifier {
127    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128        // Check if this is a call to stageleft::runtime_support::fn*
129        if let Some(last_segment) = path.segments.last() {
130            let fn_name = last_segment.ident.to_string();
131            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
132            fn_name.contains("_type_hint")
133                && path.segments.len() > 2
134                && path.segments[0].ident == "stageleft"
135                && path.segments[1].ident == "runtime_support"
136        } else {
137            false
138        }
139    }
140
141    fn extract_closure_from_args(
142        &self,
143        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144    ) -> Option<syn::Expr> {
145        // Look through the arguments for a closure expression
146        for arg in args {
147            if let syn::Expr::Closure(_) = arg {
148                return Some(arg.clone());
149            }
150            // Also check for closures nested in other expressions (like blocks)
151            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152                return Some(closure_expr);
153            }
154        }
155        None
156    }
157
158    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159        let mut visitor = ClosureFinder {
160            found_closure: None,
161            prefer_inner_blocks: true,
162        };
163        visitor.visit_expr(expr);
164        visitor.found_closure
165    }
166}
167
168/// Visitor that finds closures in expressions with special block handling
169struct ClosureFinder {
170    found_closure: Option<syn::Expr>,
171    prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176        // If we already found a closure, don't continue searching
177        if self.found_closure.is_some() {
178            return;
179        }
180
181        match expr {
182            syn::Expr::Closure(_) => {
183                self.found_closure = Some(expr.clone());
184            }
185            syn::Expr::Block(block) if self.prefer_inner_blocks => {
186                // Special handling for blocks - look for inner blocks that contain closures
187                for stmt in &block.block.stmts {
188                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
189                        && let syn::Expr::Block(_) = stmt_expr
190                    {
191                        // Check if this nested block contains a closure
192                        let mut inner_visitor = ClosureFinder {
193                            found_closure: None,
194                            prefer_inner_blocks: false, // Avoid infinite recursion
195                        };
196                        inner_visitor.visit_expr(stmt_expr);
197                        if inner_visitor.found_closure.is_some() {
198                            // Found a closure in an inner block, return that block
199                            self.found_closure = Some(stmt_expr.clone());
200                            return;
201                        }
202                    }
203                }
204
205                // If no inner block with closure found, continue with normal visitation
206                visit::visit_expr(self, expr);
207
208                // If we found a closure, just return the closure itself, not the whole block
209                // unless we're in the special case where we want the containing block
210                if self.found_closure.is_some() {
211                    // The closure was found during visitation, no need to wrap in block
212                }
213            }
214            _ => {
215                // Use default visitor behavior for all other expressions
216                visit::visit_expr(self, expr);
217            }
218        }
219    }
220}
221
222/// Debug displays the type's tokens.
223///
224/// Boxes `syn::Type` which is ~320 bytes.
225#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229    fn from(t: syn::Type) -> Self {
230        Self(Box::new(t))
231    }
232}
233
234impl Deref for DebugType {
235    type Target = syn::Type;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl ToTokens for DebugType {
243    fn to_tokens(&self, tokens: &mut TokenStream) {
244        self.0.to_tokens(tokens);
245    }
246}
247
248impl Debug for DebugType {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "{}", self.0.to_token_stream())
251    }
252}
253
254pub enum DebugInstantiate {
255    Building,
256    Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260    not(feature = "build"),
261    expect(
262        dead_code,
263        reason = "sink, source unused without `feature = \"build\"`."
264    )
265)]
266pub struct DebugInstantiateFinalized {
267    sink: syn::Expr,
268    source: syn::Expr,
269    connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273    fn from(f: DebugInstantiateFinalized) -> Self {
274        Self::Finalized(Box::new(f))
275    }
276}
277
278impl Debug for DebugInstantiate {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "<network instantiate>")
281    }
282}
283
284impl Hash for DebugInstantiate {
285    fn hash<H: Hasher>(&self, _state: &mut H) {
286        // Do nothing
287    }
288}
289
290impl Clone for DebugInstantiate {
291    fn clone(&self) -> Self {
292        match self {
293            DebugInstantiate::Building => DebugInstantiate::Building,
294            DebugInstantiate::Finalized(_) => {
295                panic!("DebugInstantiate::Finalized should not be cloned")
296            }
297        }
298    }
299}
300
301/// A source in a Hydro graph, where data enters the graph.
302#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304    Stream(DebugExpr),
305    ExternalNetwork(),
306    Iter(DebugExpr),
307    Spin(),
308    ClusterMembers(LocationId),
309    Embedded(syn::Ident),
310}
311
312#[cfg(feature = "build")]
313/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
314/// and simulations.
315///
316/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
317/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
318pub trait DfirBuilder {
319    /// Whether the representation of singletons should include intermediate states.
320    fn singleton_intermediates(&self) -> bool;
321
322    /// Gets the DFIR builder for the given location, creating it if necessary.
323    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
324
325    fn batch(
326        &mut self,
327        in_ident: syn::Ident,
328        in_location: &LocationId,
329        in_kind: &CollectionKind,
330        out_ident: &syn::Ident,
331        out_location: &LocationId,
332        op_meta: &HydroIrOpMetadata,
333    );
334    fn yield_from_tick(
335        &mut self,
336        in_ident: syn::Ident,
337        in_location: &LocationId,
338        in_kind: &CollectionKind,
339        out_ident: &syn::Ident,
340        out_location: &LocationId,
341    );
342
343    fn begin_atomic(
344        &mut self,
345        in_ident: syn::Ident,
346        in_location: &LocationId,
347        in_kind: &CollectionKind,
348        out_ident: &syn::Ident,
349        out_location: &LocationId,
350        op_meta: &HydroIrOpMetadata,
351    );
352    fn end_atomic(
353        &mut self,
354        in_ident: syn::Ident,
355        in_location: &LocationId,
356        in_kind: &CollectionKind,
357        out_ident: &syn::Ident,
358    );
359
360    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
361    fn observe_nondet(
362        &mut self,
363        trusted: bool,
364        location: &LocationId,
365        in_ident: syn::Ident,
366        in_kind: &CollectionKind,
367        out_ident: &syn::Ident,
368        out_kind: &CollectionKind,
369        op_meta: &HydroIrOpMetadata,
370    );
371
372    #[expect(clippy::too_many_arguments, reason = "TODO")]
373    fn create_network(
374        &mut self,
375        from: &LocationId,
376        to: &LocationId,
377        input_ident: syn::Ident,
378        out_ident: &syn::Ident,
379        serialize: Option<&DebugExpr>,
380        sink: syn::Expr,
381        source: syn::Expr,
382        deserialize: Option<&DebugExpr>,
383        tag_id: usize,
384    );
385
386    fn create_external_source(
387        &mut self,
388        on: &LocationId,
389        source_expr: syn::Expr,
390        out_ident: &syn::Ident,
391        deserialize: Option<&DebugExpr>,
392        tag_id: usize,
393    );
394
395    fn create_external_output(
396        &mut self,
397        on: &LocationId,
398        sink_expr: syn::Expr,
399        input_ident: &syn::Ident,
400        serialize: Option<&DebugExpr>,
401        tag_id: usize,
402    );
403}
404
405#[cfg(feature = "build")]
406impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
407    fn singleton_intermediates(&self) -> bool {
408        false
409    }
410
411    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
412        self.entry(location.root().key())
413            .expect("location was removed")
414            .or_default()
415    }
416
417    fn batch(
418        &mut self,
419        in_ident: syn::Ident,
420        in_location: &LocationId,
421        in_kind: &CollectionKind,
422        out_ident: &syn::Ident,
423        _out_location: &LocationId,
424        _op_meta: &HydroIrOpMetadata,
425    ) {
426        let builder = self.get_dfir_mut(in_location.root());
427        if in_kind.is_bounded()
428            && matches!(
429                in_kind,
430                CollectionKind::Singleton { .. }
431                    | CollectionKind::Optional { .. }
432                    | CollectionKind::KeyedSingleton { .. }
433            )
434        {
435            assert!(in_location.is_top_level());
436            builder.add_dfir(
437                parse_quote! {
438                    #out_ident = #in_ident -> persist::<'static>();
439                },
440                None,
441                None,
442            );
443        } else {
444            builder.add_dfir(
445                parse_quote! {
446                    #out_ident = #in_ident;
447                },
448                None,
449                None,
450            );
451        }
452    }
453
454    fn yield_from_tick(
455        &mut self,
456        in_ident: syn::Ident,
457        in_location: &LocationId,
458        _in_kind: &CollectionKind,
459        out_ident: &syn::Ident,
460        _out_location: &LocationId,
461    ) {
462        let builder = self.get_dfir_mut(in_location.root());
463        builder.add_dfir(
464            parse_quote! {
465                #out_ident = #in_ident;
466            },
467            None,
468            None,
469        );
470    }
471
472    fn begin_atomic(
473        &mut self,
474        in_ident: syn::Ident,
475        in_location: &LocationId,
476        _in_kind: &CollectionKind,
477        out_ident: &syn::Ident,
478        _out_location: &LocationId,
479        _op_meta: &HydroIrOpMetadata,
480    ) {
481        let builder = self.get_dfir_mut(in_location.root());
482        builder.add_dfir(
483            parse_quote! {
484                #out_ident = #in_ident;
485            },
486            None,
487            None,
488        );
489    }
490
491    fn end_atomic(
492        &mut self,
493        in_ident: syn::Ident,
494        in_location: &LocationId,
495        _in_kind: &CollectionKind,
496        out_ident: &syn::Ident,
497    ) {
498        let builder = self.get_dfir_mut(in_location.root());
499        builder.add_dfir(
500            parse_quote! {
501                #out_ident = #in_ident;
502            },
503            None,
504            None,
505        );
506    }
507
508    fn observe_nondet(
509        &mut self,
510        _trusted: bool,
511        location: &LocationId,
512        in_ident: syn::Ident,
513        _in_kind: &CollectionKind,
514        out_ident: &syn::Ident,
515        _out_kind: &CollectionKind,
516        _op_meta: &HydroIrOpMetadata,
517    ) {
518        let builder = self.get_dfir_mut(location);
519        builder.add_dfir(
520            parse_quote! {
521                #out_ident = #in_ident;
522            },
523            None,
524            None,
525        );
526    }
527
528    fn create_network(
529        &mut self,
530        from: &LocationId,
531        to: &LocationId,
532        input_ident: syn::Ident,
533        out_ident: &syn::Ident,
534        serialize: Option<&DebugExpr>,
535        sink: syn::Expr,
536        source: syn::Expr,
537        deserialize: Option<&DebugExpr>,
538        tag_id: usize,
539    ) {
540        let sender_builder = self.get_dfir_mut(from);
541        if let Some(serialize_pipeline) = serialize {
542            sender_builder.add_dfir(
543                parse_quote! {
544                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
545                },
546                None,
547                // operator tag separates send and receive, which otherwise have the same next_stmt_id
548                Some(&format!("send{}", tag_id)),
549            );
550        } else {
551            sender_builder.add_dfir(
552                parse_quote! {
553                    #input_ident -> dest_sink(#sink);
554                },
555                None,
556                Some(&format!("send{}", tag_id)),
557            );
558        }
559
560        let receiver_builder = self.get_dfir_mut(to);
561        if let Some(deserialize_pipeline) = deserialize {
562            receiver_builder.add_dfir(
563                parse_quote! {
564                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
565                },
566                None,
567                Some(&format!("recv{}", tag_id)),
568            );
569        } else {
570            receiver_builder.add_dfir(
571                parse_quote! {
572                    #out_ident = source_stream(#source);
573                },
574                None,
575                Some(&format!("recv{}", tag_id)),
576            );
577        }
578    }
579
580    fn create_external_source(
581        &mut self,
582        on: &LocationId,
583        source_expr: syn::Expr,
584        out_ident: &syn::Ident,
585        deserialize: Option<&DebugExpr>,
586        tag_id: usize,
587    ) {
588        let receiver_builder = self.get_dfir_mut(on);
589        if let Some(deserialize_pipeline) = deserialize {
590            receiver_builder.add_dfir(
591                parse_quote! {
592                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
593                },
594                None,
595                Some(&format!("recv{}", tag_id)),
596            );
597        } else {
598            receiver_builder.add_dfir(
599                parse_quote! {
600                    #out_ident = source_stream(#source_expr);
601                },
602                None,
603                Some(&format!("recv{}", tag_id)),
604            );
605        }
606    }
607
608    fn create_external_output(
609        &mut self,
610        on: &LocationId,
611        sink_expr: syn::Expr,
612        input_ident: &syn::Ident,
613        serialize: Option<&DebugExpr>,
614        tag_id: usize,
615    ) {
616        let sender_builder = self.get_dfir_mut(on);
617        if let Some(serialize_fn) = serialize {
618            sender_builder.add_dfir(
619                parse_quote! {
620                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
621                },
622                None,
623                // operator tag separates send and receive, which otherwise have the same next_stmt_id
624                Some(&format!("send{}", tag_id)),
625            );
626        } else {
627            sender_builder.add_dfir(
628                parse_quote! {
629                    #input_ident -> dest_sink(#sink_expr);
630                },
631                None,
632                Some(&format!("send{}", tag_id)),
633            );
634        }
635    }
636}
637
638#[cfg(feature = "build")]
639pub enum BuildersOrCallback<'a, L, N>
640where
641    L: FnMut(&mut HydroRoot, &mut usize),
642    N: FnMut(&mut HydroNode, &mut usize),
643{
644    Builders(&'a mut dyn DfirBuilder),
645    Callback(L, N),
646}
647
648/// An root in a Hydro graph, which is an pipeline that doesn't emit
649/// any downstream values. Traversals over the dataflow graph and
650/// generating DFIR IR start from roots.
651#[derive(Debug, Hash)]
652pub enum HydroRoot {
653    ForEach {
654        f: DebugExpr,
655        input: Box<HydroNode>,
656        op_metadata: HydroIrOpMetadata,
657    },
658    SendExternal {
659        to_external_key: LocationKey,
660        to_port_id: ExternalPortId,
661        to_many: bool,
662        unpaired: bool,
663        serialize_fn: Option<DebugExpr>,
664        instantiate_fn: DebugInstantiate,
665        input: Box<HydroNode>,
666        op_metadata: HydroIrOpMetadata,
667    },
668    DestSink {
669        sink: DebugExpr,
670        input: Box<HydroNode>,
671        op_metadata: HydroIrOpMetadata,
672    },
673    CycleSink {
674        ident: syn::Ident,
675        input: Box<HydroNode>,
676        op_metadata: HydroIrOpMetadata,
677    },
678    EmbeddedOutput {
679        ident: syn::Ident,
680        input: Box<HydroNode>,
681        op_metadata: HydroIrOpMetadata,
682    },
683}
684
685impl HydroRoot {
686    #[cfg(feature = "build")]
687    pub fn compile_network<'a, D>(
688        &mut self,
689        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
690        seen_tees: &mut SeenTees,
691        processes: &SparseSecondaryMap<LocationKey, D::Process>,
692        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
693        externals: &SparseSecondaryMap<LocationKey, D::External>,
694        env: &mut D::InstantiateEnv,
695    ) where
696        D: Deploy<'a>,
697    {
698        let refcell_extra_stmts = RefCell::new(extra_stmts);
699        let refcell_env = RefCell::new(env);
700        self.transform_bottom_up(
701            &mut |l| {
702                if let HydroRoot::SendExternal {
703                    input,
704                    to_external_key,
705                    to_port_id,
706                    to_many,
707                    unpaired,
708                    instantiate_fn,
709                    ..
710                } = l
711                {
712                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
713                        DebugInstantiate::Building => {
714                            let to_node = externals
715                                .get(*to_external_key)
716                                .unwrap_or_else(|| {
717                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
718                                })
719                                .clone();
720
721                            match input.metadata().location_id.root() {
722                                &LocationId::Process(process_key) => {
723                                    if *to_many {
724                                        (
725                                            (
726                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
727                                                parse_quote!(DUMMY),
728                                            ),
729                                            Box::new(|| {}) as Box<dyn FnOnce()>,
730                                        )
731                                    } else {
732                                        let from_node = processes
733                                            .get(process_key)
734                                            .unwrap_or_else(|| {
735                                                panic!("A process used in the graph was not instantiated: {}", process_key)
736                                            })
737                                            .clone();
738
739                                        let sink_port = from_node.next_port();
740                                        let source_port = to_node.next_port();
741
742                                        if *unpaired {
743                                            use stageleft::quote_type;
744                                            use tokio_util::codec::LengthDelimitedCodec;
745
746                                            to_node.register(*to_port_id, source_port.clone());
747
748                                            let _ = D::e2o_source(
749                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
750                                                &to_node, &source_port,
751                                                &from_node, &sink_port,
752                                                &quote_type::<LengthDelimitedCodec>(),
753                                                format!("{}_{}", *to_external_key, *to_port_id)
754                                            );
755                                        }
756
757                                        (
758                                            (
759                                                D::o2e_sink(
760                                                    &from_node,
761                                                    &sink_port,
762                                                    &to_node,
763                                                    &source_port,
764                                                    format!("{}_{}", *to_external_key, *to_port_id)
765                                                ),
766                                                parse_quote!(DUMMY),
767                                            ),
768                                            if *unpaired {
769                                                D::e2o_connect(
770                                                    &to_node,
771                                                    &source_port,
772                                                    &from_node,
773                                                    &sink_port,
774                                                    *to_many,
775                                                    NetworkHint::Auto,
776                                                )
777                                            } else {
778                                                Box::new(|| {}) as Box<dyn FnOnce()>
779                                            },
780                                        )
781                                    }
782                                }
783                                LocationId::Cluster(_) => todo!(),
784                                _ => panic!()
785                            }
786                        },
787
788                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
789                    };
790
791                    *instantiate_fn = DebugInstantiateFinalized {
792                        sink: sink_expr,
793                        source: source_expr,
794                        connect_fn: Some(connect_fn),
795                    }
796                    .into();
797                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
798                    let element_type = match &input.metadata().collection_kind {
799                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
800                        _ => panic!("Embedded output must have Stream collection kind"),
801                    };
802                    let location_key = match input.metadata().location_id.root() {
803                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
804                        _ => panic!("Embedded output must be on a process or cluster"),
805                    };
806                    D::register_embedded_output(
807                        &mut refcell_env.borrow_mut(),
808                        location_key,
809                        ident,
810                        &element_type,
811                    );
812                }
813            },
814            &mut |n| {
815                if let HydroNode::Network {
816                    input,
817                    instantiate_fn,
818                    metadata,
819                    ..
820                } = n
821                {
822                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
823                        DebugInstantiate::Building => instantiate_network::<D>(
824                            input.metadata().location_id.root(),
825                            metadata.location_id.root(),
826                            processes,
827                            clusters,
828                        ),
829
830                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
831                    };
832
833                    *instantiate_fn = DebugInstantiateFinalized {
834                        sink: sink_expr,
835                        source: source_expr,
836                        connect_fn: Some(connect_fn),
837                    }
838                    .into();
839                } else if let HydroNode::ExternalInput {
840                    from_external_key,
841                    from_port_id,
842                    from_many,
843                    codec_type,
844                    port_hint,
845                    instantiate_fn,
846                    metadata,
847                    ..
848                } = n
849                {
850                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
851                        DebugInstantiate::Building => {
852                            let from_node = externals
853                                .get(*from_external_key)
854                                .unwrap_or_else(|| {
855                                    panic!(
856                                        "A external used in the graph was not instantiated: {}",
857                                        from_external_key,
858                                    )
859                                })
860                                .clone();
861
862                            match metadata.location_id.root() {
863                                &LocationId::Process(process_key) => {
864                                    let to_node = processes
865                                        .get(process_key)
866                                        .unwrap_or_else(|| {
867                                            panic!("A process used in the graph was not instantiated: {}", process_key)
868                                        })
869                                        .clone();
870
871                                    let sink_port = from_node.next_port();
872                                    let source_port = to_node.next_port();
873
874                                    from_node.register(*from_port_id, sink_port.clone());
875
876                                    (
877                                        (
878                                            parse_quote!(DUMMY),
879                                            if *from_many {
880                                                D::e2o_many_source(
881                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
882                                                    &to_node, &source_port,
883                                                    codec_type.0.as_ref(),
884                                                    format!("{}_{}", *from_external_key, *from_port_id)
885                                                )
886                                            } else {
887                                                D::e2o_source(
888                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
889                                                    &from_node, &sink_port,
890                                                    &to_node, &source_port,
891                                                    codec_type.0.as_ref(),
892                                                    format!("{}_{}", *from_external_key, *from_port_id)
893                                                )
894                                            },
895                                        ),
896                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
897                                    )
898                                }
899                                LocationId::Cluster(_) => todo!(),
900                                _ => panic!()
901                            }
902                        },
903
904                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
905                    };
906
907                    *instantiate_fn = DebugInstantiateFinalized {
908                        sink: sink_expr,
909                        source: source_expr,
910                        connect_fn: Some(connect_fn),
911                    }
912                    .into();
913                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
914                    let element_type = match &metadata.collection_kind {
915                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
916                        _ => panic!("Embedded source must have Stream collection kind"),
917                    };
918                    let location_key = match metadata.location_id.root() {
919                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
920                        _ => panic!("Embedded source must be on a process or cluster"),
921                    };
922                    D::register_embedded_input(
923                        &mut refcell_env.borrow_mut(),
924                        location_key,
925                        ident,
926                        &element_type,
927                    );
928                }
929            },
930            seen_tees,
931            false,
932        );
933    }
934
935    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
936        self.transform_bottom_up(
937            &mut |l| {
938                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
939                    match instantiate_fn {
940                        DebugInstantiate::Building => panic!("network not built"),
941
942                        DebugInstantiate::Finalized(finalized) => {
943                            (finalized.connect_fn.take().unwrap())();
944                        }
945                    }
946                }
947            },
948            &mut |n| {
949                if let HydroNode::Network { instantiate_fn, .. }
950                | HydroNode::ExternalInput { instantiate_fn, .. } = n
951                {
952                    match instantiate_fn {
953                        DebugInstantiate::Building => panic!("network not built"),
954
955                        DebugInstantiate::Finalized(finalized) => {
956                            (finalized.connect_fn.take().unwrap())();
957                        }
958                    }
959                }
960            },
961            seen_tees,
962            false,
963        );
964    }
965
966    pub fn transform_bottom_up(
967        &mut self,
968        transform_root: &mut impl FnMut(&mut HydroRoot),
969        transform_node: &mut impl FnMut(&mut HydroNode),
970        seen_tees: &mut SeenTees,
971        check_well_formed: bool,
972    ) {
973        self.transform_children(
974            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
975            seen_tees,
976        );
977
978        transform_root(self);
979    }
980
981    pub fn transform_children(
982        &mut self,
983        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
984        seen_tees: &mut SeenTees,
985    ) {
986        match self {
987            HydroRoot::ForEach { input, .. }
988            | HydroRoot::SendExternal { input, .. }
989            | HydroRoot::DestSink { input, .. }
990            | HydroRoot::CycleSink { input, .. }
991            | HydroRoot::EmbeddedOutput { input, .. } => {
992                transform(input, seen_tees);
993            }
994        }
995    }
996
997    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
998        match self {
999            HydroRoot::ForEach {
1000                f,
1001                input,
1002                op_metadata,
1003            } => HydroRoot::ForEach {
1004                f: f.clone(),
1005                input: Box::new(input.deep_clone(seen_tees)),
1006                op_metadata: op_metadata.clone(),
1007            },
1008            HydroRoot::SendExternal {
1009                to_external_key,
1010                to_port_id,
1011                to_many,
1012                unpaired,
1013                serialize_fn,
1014                instantiate_fn,
1015                input,
1016                op_metadata,
1017            } => HydroRoot::SendExternal {
1018                to_external_key: *to_external_key,
1019                to_port_id: *to_port_id,
1020                to_many: *to_many,
1021                unpaired: *unpaired,
1022                serialize_fn: serialize_fn.clone(),
1023                instantiate_fn: instantiate_fn.clone(),
1024                input: Box::new(input.deep_clone(seen_tees)),
1025                op_metadata: op_metadata.clone(),
1026            },
1027            HydroRoot::DestSink {
1028                sink,
1029                input,
1030                op_metadata,
1031            } => HydroRoot::DestSink {
1032                sink: sink.clone(),
1033                input: Box::new(input.deep_clone(seen_tees)),
1034                op_metadata: op_metadata.clone(),
1035            },
1036            HydroRoot::CycleSink {
1037                ident,
1038                input,
1039                op_metadata,
1040            } => HydroRoot::CycleSink {
1041                ident: ident.clone(),
1042                input: Box::new(input.deep_clone(seen_tees)),
1043                op_metadata: op_metadata.clone(),
1044            },
1045            HydroRoot::EmbeddedOutput {
1046                ident,
1047                input,
1048                op_metadata,
1049            } => HydroRoot::EmbeddedOutput {
1050                ident: ident.clone(),
1051                input: Box::new(input.deep_clone(seen_tees)),
1052                op_metadata: op_metadata.clone(),
1053            },
1054        }
1055    }
1056
1057    #[cfg(feature = "build")]
1058    pub fn emit<'a, D: Deploy<'a>>(
1059        &mut self,
1060        graph_builders: &mut dyn DfirBuilder,
1061        seen_tees: &mut SeenTees,
1062        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1063        next_stmt_id: &mut usize,
1064    ) {
1065        self.emit_core::<D>(
1066            &mut BuildersOrCallback::<
1067                fn(&mut HydroRoot, &mut usize),
1068                fn(&mut HydroNode, &mut usize),
1069            >::Builders(graph_builders),
1070            seen_tees,
1071            built_tees,
1072            next_stmt_id,
1073        );
1074    }
1075
1076    #[cfg(feature = "build")]
1077    pub fn emit_core<'a, D: Deploy<'a>>(
1078        &mut self,
1079        builders_or_callback: &mut BuildersOrCallback<
1080            impl FnMut(&mut HydroRoot, &mut usize),
1081            impl FnMut(&mut HydroNode, &mut usize),
1082        >,
1083        seen_tees: &mut SeenTees,
1084        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1085        next_stmt_id: &mut usize,
1086    ) {
1087        match self {
1088            HydroRoot::ForEach { f, input, .. } => {
1089                let input_ident =
1090                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1091
1092                match builders_or_callback {
1093                    BuildersOrCallback::Builders(graph_builders) => {
1094                        graph_builders
1095                            .get_dfir_mut(&input.metadata().location_id)
1096                            .add_dfir(
1097                                parse_quote! {
1098                                    #input_ident -> for_each(#f);
1099                                },
1100                                None,
1101                                Some(&next_stmt_id.to_string()),
1102                            );
1103                    }
1104                    BuildersOrCallback::Callback(leaf_callback, _) => {
1105                        leaf_callback(self, next_stmt_id);
1106                    }
1107                }
1108
1109                *next_stmt_id += 1;
1110            }
1111
1112            HydroRoot::SendExternal {
1113                serialize_fn,
1114                instantiate_fn,
1115                input,
1116                ..
1117            } => {
1118                let input_ident =
1119                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1120
1121                match builders_or_callback {
1122                    BuildersOrCallback::Builders(graph_builders) => {
1123                        let (sink_expr, _) = match instantiate_fn {
1124                            DebugInstantiate::Building => (
1125                                syn::parse_quote!(DUMMY_SINK),
1126                                syn::parse_quote!(DUMMY_SOURCE),
1127                            ),
1128
1129                            DebugInstantiate::Finalized(finalized) => {
1130                                (finalized.sink.clone(), finalized.source.clone())
1131                            }
1132                        };
1133
1134                        graph_builders.create_external_output(
1135                            &input.metadata().location_id,
1136                            sink_expr,
1137                            &input_ident,
1138                            serialize_fn.as_ref(),
1139                            *next_stmt_id,
1140                        );
1141                    }
1142                    BuildersOrCallback::Callback(leaf_callback, _) => {
1143                        leaf_callback(self, next_stmt_id);
1144                    }
1145                }
1146
1147                *next_stmt_id += 1;
1148            }
1149
1150            HydroRoot::DestSink { sink, input, .. } => {
1151                let input_ident =
1152                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1153
1154                match builders_or_callback {
1155                    BuildersOrCallback::Builders(graph_builders) => {
1156                        graph_builders
1157                            .get_dfir_mut(&input.metadata().location_id)
1158                            .add_dfir(
1159                                parse_quote! {
1160                                    #input_ident -> dest_sink(#sink);
1161                                },
1162                                None,
1163                                Some(&next_stmt_id.to_string()),
1164                            );
1165                    }
1166                    BuildersOrCallback::Callback(leaf_callback, _) => {
1167                        leaf_callback(self, next_stmt_id);
1168                    }
1169                }
1170
1171                *next_stmt_id += 1;
1172            }
1173
1174            HydroRoot::CycleSink { ident, input, .. } => {
1175                let input_ident =
1176                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1177
1178                match builders_or_callback {
1179                    BuildersOrCallback::Builders(graph_builders) => {
1180                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1181                            CollectionKind::KeyedSingleton {
1182                                key_type,
1183                                value_type,
1184                                ..
1185                            }
1186                            | CollectionKind::KeyedStream {
1187                                key_type,
1188                                value_type,
1189                                ..
1190                            } => {
1191                                parse_quote!((#key_type, #value_type))
1192                            }
1193                            CollectionKind::Stream { element_type, .. }
1194                            | CollectionKind::Singleton { element_type, .. }
1195                            | CollectionKind::Optional { element_type, .. } => {
1196                                parse_quote!(#element_type)
1197                            }
1198                        };
1199
1200                        graph_builders
1201                            .get_dfir_mut(&input.metadata().location_id)
1202                            .add_dfir(
1203                                parse_quote! {
1204                                    #ident = #input_ident -> identity::<#elem_type>();
1205                                },
1206                                None,
1207                                None,
1208                            );
1209                    }
1210                    // No ID, no callback
1211                    BuildersOrCallback::Callback(_, _) => {}
1212                }
1213            }
1214
1215            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1216                let input_ident =
1217                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1218
1219                match builders_or_callback {
1220                    BuildersOrCallback::Builders(graph_builders) => {
1221                        graph_builders
1222                            .get_dfir_mut(&input.metadata().location_id)
1223                            .add_dfir(
1224                                parse_quote! {
1225                                    #input_ident -> for_each(&mut #ident);
1226                                },
1227                                None,
1228                                Some(&next_stmt_id.to_string()),
1229                            );
1230                    }
1231                    BuildersOrCallback::Callback(leaf_callback, _) => {
1232                        leaf_callback(self, next_stmt_id);
1233                    }
1234                }
1235
1236                *next_stmt_id += 1;
1237            }
1238        }
1239    }
1240
1241    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1242        match self {
1243            HydroRoot::ForEach { op_metadata, .. }
1244            | HydroRoot::SendExternal { op_metadata, .. }
1245            | HydroRoot::DestSink { op_metadata, .. }
1246            | HydroRoot::CycleSink { op_metadata, .. }
1247            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1248        }
1249    }
1250
1251    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1252        match self {
1253            HydroRoot::ForEach { op_metadata, .. }
1254            | HydroRoot::SendExternal { op_metadata, .. }
1255            | HydroRoot::DestSink { op_metadata, .. }
1256            | HydroRoot::CycleSink { op_metadata, .. }
1257            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1258        }
1259    }
1260
1261    pub fn input(&self) -> &HydroNode {
1262        match self {
1263            HydroRoot::ForEach { input, .. }
1264            | HydroRoot::SendExternal { input, .. }
1265            | HydroRoot::DestSink { input, .. }
1266            | HydroRoot::CycleSink { input, .. }
1267            | HydroRoot::EmbeddedOutput { input, .. } => input,
1268        }
1269    }
1270
1271    pub fn input_metadata(&self) -> &HydroIrMetadata {
1272        self.input().metadata()
1273    }
1274
1275    pub fn print_root(&self) -> String {
1276        match self {
1277            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1278            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1279            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1280            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1281            HydroRoot::EmbeddedOutput { ident, .. } => {
1282                format!("EmbeddedOutput({})", ident)
1283            }
1284        }
1285    }
1286
1287    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1288        match self {
1289            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1290                transform(f);
1291            }
1292            HydroRoot::SendExternal { .. }
1293            | HydroRoot::CycleSink { .. }
1294            | HydroRoot::EmbeddedOutput { .. } => {}
1295        }
1296    }
1297}
1298
1299#[cfg(feature = "build")]
1300pub fn emit<'a, D: Deploy<'a>>(
1301    ir: &mut Vec<HydroRoot>,
1302) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1303    let mut builders = SecondaryMap::new();
1304    let mut seen_tees = HashMap::new();
1305    let mut built_tees = HashMap::new();
1306    let mut next_stmt_id = 0;
1307    for leaf in ir {
1308        leaf.emit::<D>(
1309            &mut builders,
1310            &mut seen_tees,
1311            &mut built_tees,
1312            &mut next_stmt_id,
1313        );
1314    }
1315    builders
1316}
1317
1318#[cfg(feature = "build")]
1319pub fn traverse_dfir<'a, D: Deploy<'a>>(
1320    ir: &mut [HydroRoot],
1321    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1322    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1323) {
1324    let mut seen_tees = HashMap::new();
1325    let mut built_tees = HashMap::new();
1326    let mut next_stmt_id = 0;
1327    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1328    ir.iter_mut().for_each(|leaf| {
1329        leaf.emit_core::<D>(
1330            &mut callback,
1331            &mut seen_tees,
1332            &mut built_tees,
1333            &mut next_stmt_id,
1334        );
1335    });
1336}
1337
1338pub fn transform_bottom_up(
1339    ir: &mut [HydroRoot],
1340    transform_root: &mut impl FnMut(&mut HydroRoot),
1341    transform_node: &mut impl FnMut(&mut HydroNode),
1342    check_well_formed: bool,
1343) {
1344    let mut seen_tees = HashMap::new();
1345    ir.iter_mut().for_each(|leaf| {
1346        leaf.transform_bottom_up(
1347            transform_root,
1348            transform_node,
1349            &mut seen_tees,
1350            check_well_formed,
1351        );
1352    });
1353}
1354
1355pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1356    let mut seen_tees = HashMap::new();
1357    ir.iter()
1358        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1359        .collect()
1360}
1361
1362type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1363thread_local! {
1364    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1365}
1366
1367pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1368    PRINTED_TEES.with(|printed_tees| {
1369        let mut printed_tees_mut = printed_tees.borrow_mut();
1370        *printed_tees_mut = Some((0, HashMap::new()));
1371        drop(printed_tees_mut);
1372
1373        let ret = f();
1374
1375        let mut printed_tees_mut = printed_tees.borrow_mut();
1376        *printed_tees_mut = None;
1377
1378        ret
1379    })
1380}
1381
1382pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1383
1384impl TeeNode {
1385    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1386        Rc::as_ptr(&self.0)
1387    }
1388}
1389
1390impl Debug for TeeNode {
1391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1392        PRINTED_TEES.with(|printed_tees| {
1393            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1394            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1395
1396            if let Some(printed_tees_mut) = printed_tees_mut {
1397                if let Some(existing) = printed_tees_mut
1398                    .1
1399                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1400                {
1401                    write!(f, "<tee {}>", existing)
1402                } else {
1403                    let next_id = printed_tees_mut.0;
1404                    printed_tees_mut.0 += 1;
1405                    printed_tees_mut
1406                        .1
1407                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1408                    drop(printed_tees_mut_borrow);
1409                    write!(f, "<tee {}>: ", next_id)?;
1410                    Debug::fmt(&self.0.borrow(), f)
1411                }
1412            } else {
1413                drop(printed_tees_mut_borrow);
1414                write!(f, "<tee>: ")?;
1415                Debug::fmt(&self.0.borrow(), f)
1416            }
1417        })
1418    }
1419}
1420
1421impl Hash for TeeNode {
1422    fn hash<H: Hasher>(&self, state: &mut H) {
1423        self.0.borrow_mut().hash(state);
1424    }
1425}
1426
1427#[derive(Clone, PartialEq, Eq, Debug)]
1428pub enum BoundKind {
1429    Unbounded,
1430    Bounded,
1431}
1432
1433#[derive(Clone, PartialEq, Eq, Debug)]
1434pub enum StreamOrder {
1435    NoOrder,
1436    TotalOrder,
1437}
1438
1439#[derive(Clone, PartialEq, Eq, Debug)]
1440pub enum StreamRetry {
1441    AtLeastOnce,
1442    ExactlyOnce,
1443}
1444
1445#[derive(Clone, PartialEq, Eq, Debug)]
1446pub enum KeyedSingletonBoundKind {
1447    Unbounded,
1448    BoundedValue,
1449    Bounded,
1450}
1451
1452#[derive(Clone, PartialEq, Eq, Debug)]
1453pub enum CollectionKind {
1454    Stream {
1455        bound: BoundKind,
1456        order: StreamOrder,
1457        retry: StreamRetry,
1458        element_type: DebugType,
1459    },
1460    Singleton {
1461        bound: BoundKind,
1462        element_type: DebugType,
1463    },
1464    Optional {
1465        bound: BoundKind,
1466        element_type: DebugType,
1467    },
1468    KeyedStream {
1469        bound: BoundKind,
1470        value_order: StreamOrder,
1471        value_retry: StreamRetry,
1472        key_type: DebugType,
1473        value_type: DebugType,
1474    },
1475    KeyedSingleton {
1476        bound: KeyedSingletonBoundKind,
1477        key_type: DebugType,
1478        value_type: DebugType,
1479    },
1480}
1481
1482impl CollectionKind {
1483    pub fn is_bounded(&self) -> bool {
1484        matches!(
1485            self,
1486            CollectionKind::Stream {
1487                bound: BoundKind::Bounded,
1488                ..
1489            } | CollectionKind::Singleton {
1490                bound: BoundKind::Bounded,
1491                ..
1492            } | CollectionKind::Optional {
1493                bound: BoundKind::Bounded,
1494                ..
1495            } | CollectionKind::KeyedStream {
1496                bound: BoundKind::Bounded,
1497                ..
1498            } | CollectionKind::KeyedSingleton {
1499                bound: KeyedSingletonBoundKind::Bounded,
1500                ..
1501            }
1502        )
1503    }
1504}
1505
1506#[derive(Clone)]
1507pub struct HydroIrMetadata {
1508    pub location_id: LocationId,
1509    pub collection_kind: CollectionKind,
1510    pub cardinality: Option<usize>,
1511    pub tag: Option<String>,
1512    pub op: HydroIrOpMetadata,
1513}
1514
1515// HydroIrMetadata shouldn't be used to hash or compare
1516impl Hash for HydroIrMetadata {
1517    fn hash<H: Hasher>(&self, _: &mut H) {}
1518}
1519
1520impl PartialEq for HydroIrMetadata {
1521    fn eq(&self, _: &Self) -> bool {
1522        true
1523    }
1524}
1525
1526impl Eq for HydroIrMetadata {}
1527
1528impl Debug for HydroIrMetadata {
1529    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1530        f.debug_struct("HydroIrMetadata")
1531            .field("location_id", &self.location_id)
1532            .field("collection_kind", &self.collection_kind)
1533            .finish()
1534    }
1535}
1536
1537/// Metadata that is specific to the operator itself, rather than its outputs.
1538/// This is available on _both_ inner nodes and roots.
1539#[derive(Clone)]
1540pub struct HydroIrOpMetadata {
1541    pub backtrace: Backtrace,
1542    pub cpu_usage: Option<f64>,
1543    pub network_recv_cpu_usage: Option<f64>,
1544    pub id: Option<usize>,
1545}
1546
1547impl HydroIrOpMetadata {
1548    #[expect(
1549        clippy::new_without_default,
1550        reason = "explicit calls to new ensure correct backtrace bounds"
1551    )]
1552    pub fn new() -> HydroIrOpMetadata {
1553        Self::new_with_skip(1)
1554    }
1555
1556    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1557        HydroIrOpMetadata {
1558            backtrace: Backtrace::get_backtrace(2 + skip_count),
1559            cpu_usage: None,
1560            network_recv_cpu_usage: None,
1561            id: None,
1562        }
1563    }
1564}
1565
1566impl Debug for HydroIrOpMetadata {
1567    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1568        f.debug_struct("HydroIrOpMetadata").finish()
1569    }
1570}
1571
1572impl Hash for HydroIrOpMetadata {
1573    fn hash<H: Hasher>(&self, _: &mut H) {}
1574}
1575
1576/// An intermediate node in a Hydro graph, which consumes data
1577/// from upstream nodes and emits data to downstream nodes.
1578#[derive(Debug, Hash)]
1579pub enum HydroNode {
1580    Placeholder,
1581
1582    /// Manually "casts" between two different collection kinds.
1583    ///
1584    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1585    /// correctness checks. In particular, the user must ensure that every possible
1586    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1587    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1588    /// collection. This ensures that the simulator does not miss any possible outputs.
1589    Cast {
1590        inner: Box<HydroNode>,
1591        metadata: HydroIrMetadata,
1592    },
1593
1594    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1595    /// interpretation of the input stream.
1596    ///
1597    /// In production, this simply passes through the input, but in simulation, this operator
1598    /// explicitly selects a randomized interpretation.
1599    ObserveNonDet {
1600        inner: Box<HydroNode>,
1601        trusted: bool, // if true, we do not need to simulate non-determinism
1602        metadata: HydroIrMetadata,
1603    },
1604
1605    Source {
1606        source: HydroSource,
1607        metadata: HydroIrMetadata,
1608    },
1609
1610    SingletonSource {
1611        value: DebugExpr,
1612        metadata: HydroIrMetadata,
1613    },
1614
1615    CycleSource {
1616        ident: syn::Ident,
1617        metadata: HydroIrMetadata,
1618    },
1619
1620    Tee {
1621        inner: TeeNode,
1622        metadata: HydroIrMetadata,
1623    },
1624
1625    BeginAtomic {
1626        inner: Box<HydroNode>,
1627        metadata: HydroIrMetadata,
1628    },
1629
1630    EndAtomic {
1631        inner: Box<HydroNode>,
1632        metadata: HydroIrMetadata,
1633    },
1634
1635    Batch {
1636        inner: Box<HydroNode>,
1637        metadata: HydroIrMetadata,
1638    },
1639
1640    YieldConcat {
1641        inner: Box<HydroNode>,
1642        metadata: HydroIrMetadata,
1643    },
1644
1645    Chain {
1646        first: Box<HydroNode>,
1647        second: Box<HydroNode>,
1648        metadata: HydroIrMetadata,
1649    },
1650
1651    ChainFirst {
1652        first: Box<HydroNode>,
1653        second: Box<HydroNode>,
1654        metadata: HydroIrMetadata,
1655    },
1656
1657    CrossProduct {
1658        left: Box<HydroNode>,
1659        right: Box<HydroNode>,
1660        metadata: HydroIrMetadata,
1661    },
1662
1663    CrossSingleton {
1664        left: Box<HydroNode>,
1665        right: Box<HydroNode>,
1666        metadata: HydroIrMetadata,
1667    },
1668
1669    Join {
1670        left: Box<HydroNode>,
1671        right: Box<HydroNode>,
1672        metadata: HydroIrMetadata,
1673    },
1674
1675    Difference {
1676        pos: Box<HydroNode>,
1677        neg: Box<HydroNode>,
1678        metadata: HydroIrMetadata,
1679    },
1680
1681    AntiJoin {
1682        pos: Box<HydroNode>,
1683        neg: Box<HydroNode>,
1684        metadata: HydroIrMetadata,
1685    },
1686
1687    ResolveFutures {
1688        input: Box<HydroNode>,
1689        metadata: HydroIrMetadata,
1690    },
1691    ResolveFuturesOrdered {
1692        input: Box<HydroNode>,
1693        metadata: HydroIrMetadata,
1694    },
1695
1696    Map {
1697        f: DebugExpr,
1698        input: Box<HydroNode>,
1699        metadata: HydroIrMetadata,
1700    },
1701    FlatMap {
1702        f: DebugExpr,
1703        input: Box<HydroNode>,
1704        metadata: HydroIrMetadata,
1705    },
1706    Filter {
1707        f: DebugExpr,
1708        input: Box<HydroNode>,
1709        metadata: HydroIrMetadata,
1710    },
1711    FilterMap {
1712        f: DebugExpr,
1713        input: Box<HydroNode>,
1714        metadata: HydroIrMetadata,
1715    },
1716
1717    DeferTick {
1718        input: Box<HydroNode>,
1719        metadata: HydroIrMetadata,
1720    },
1721    Enumerate {
1722        input: Box<HydroNode>,
1723        metadata: HydroIrMetadata,
1724    },
1725    Inspect {
1726        f: DebugExpr,
1727        input: Box<HydroNode>,
1728        metadata: HydroIrMetadata,
1729    },
1730
1731    Unique {
1732        input: Box<HydroNode>,
1733        metadata: HydroIrMetadata,
1734    },
1735
1736    Sort {
1737        input: Box<HydroNode>,
1738        metadata: HydroIrMetadata,
1739    },
1740    Fold {
1741        init: DebugExpr,
1742        acc: DebugExpr,
1743        input: Box<HydroNode>,
1744        metadata: HydroIrMetadata,
1745    },
1746
1747    Scan {
1748        init: DebugExpr,
1749        acc: DebugExpr,
1750        input: Box<HydroNode>,
1751        metadata: HydroIrMetadata,
1752    },
1753    FoldKeyed {
1754        init: DebugExpr,
1755        acc: DebugExpr,
1756        input: Box<HydroNode>,
1757        metadata: HydroIrMetadata,
1758    },
1759
1760    Reduce {
1761        f: DebugExpr,
1762        input: Box<HydroNode>,
1763        metadata: HydroIrMetadata,
1764    },
1765    ReduceKeyed {
1766        f: DebugExpr,
1767        input: Box<HydroNode>,
1768        metadata: HydroIrMetadata,
1769    },
1770    ReduceKeyedWatermark {
1771        f: DebugExpr,
1772        input: Box<HydroNode>,
1773        watermark: Box<HydroNode>,
1774        metadata: HydroIrMetadata,
1775    },
1776
1777    Network {
1778        name: Option<String>,
1779        serialize_fn: Option<DebugExpr>,
1780        instantiate_fn: DebugInstantiate,
1781        deserialize_fn: Option<DebugExpr>,
1782        input: Box<HydroNode>,
1783        metadata: HydroIrMetadata,
1784    },
1785
1786    ExternalInput {
1787        from_external_key: LocationKey,
1788        from_port_id: ExternalPortId,
1789        from_many: bool,
1790        codec_type: DebugType,
1791        port_hint: NetworkHint,
1792        instantiate_fn: DebugInstantiate,
1793        deserialize_fn: Option<DebugExpr>,
1794        metadata: HydroIrMetadata,
1795    },
1796
1797    Counter {
1798        tag: String,
1799        duration: DebugExpr,
1800        prefix: String,
1801        input: Box<HydroNode>,
1802        metadata: HydroIrMetadata,
1803    },
1804}
1805
1806pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1807pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1808
1809impl HydroNode {
1810    pub fn transform_bottom_up(
1811        &mut self,
1812        transform: &mut impl FnMut(&mut HydroNode),
1813        seen_tees: &mut SeenTees,
1814        check_well_formed: bool,
1815    ) {
1816        self.transform_children(
1817            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1818            seen_tees,
1819        );
1820
1821        transform(self);
1822
1823        let self_location = self.metadata().location_id.root();
1824
1825        if check_well_formed {
1826            match &*self {
1827                HydroNode::Network { .. } => {}
1828                _ => {
1829                    self.input_metadata().iter().for_each(|i| {
1830                        if i.location_id.root() != self_location {
1831                            panic!(
1832                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1833                                i,
1834                                i.location_id.root(),
1835                                self,
1836                                self_location
1837                            )
1838                        }
1839                    });
1840                }
1841            }
1842        }
1843    }
1844
1845    #[inline(always)]
1846    pub fn transform_children(
1847        &mut self,
1848        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1849        seen_tees: &mut SeenTees,
1850    ) {
1851        match self {
1852            HydroNode::Placeholder => {
1853                panic!();
1854            }
1855
1856            HydroNode::Source { .. }
1857            | HydroNode::SingletonSource { .. }
1858            | HydroNode::CycleSource { .. }
1859            | HydroNode::ExternalInput { .. } => {}
1860
1861            HydroNode::Tee { inner, .. } => {
1862                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1863                    *inner = TeeNode(transformed.clone());
1864                } else {
1865                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1866                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1867                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1868                    transform(&mut orig, seen_tees);
1869                    *transformed_cell.borrow_mut() = orig;
1870                    *inner = TeeNode(transformed_cell);
1871                }
1872            }
1873
1874            HydroNode::Cast { inner, .. }
1875            | HydroNode::ObserveNonDet { inner, .. }
1876            | HydroNode::BeginAtomic { inner, .. }
1877            | HydroNode::EndAtomic { inner, .. }
1878            | HydroNode::Batch { inner, .. }
1879            | HydroNode::YieldConcat { inner, .. } => {
1880                transform(inner.as_mut(), seen_tees);
1881            }
1882
1883            HydroNode::Chain { first, second, .. } => {
1884                transform(first.as_mut(), seen_tees);
1885                transform(second.as_mut(), seen_tees);
1886            }
1887
1888            HydroNode::ChainFirst { first, second, .. } => {
1889                transform(first.as_mut(), seen_tees);
1890                transform(second.as_mut(), seen_tees);
1891            }
1892
1893            HydroNode::CrossSingleton { left, right, .. }
1894            | HydroNode::CrossProduct { left, right, .. }
1895            | HydroNode::Join { left, right, .. } => {
1896                transform(left.as_mut(), seen_tees);
1897                transform(right.as_mut(), seen_tees);
1898            }
1899
1900            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1901                transform(pos.as_mut(), seen_tees);
1902                transform(neg.as_mut(), seen_tees);
1903            }
1904
1905            HydroNode::ReduceKeyedWatermark {
1906                input, watermark, ..
1907            } => {
1908                transform(input.as_mut(), seen_tees);
1909                transform(watermark.as_mut(), seen_tees);
1910            }
1911
1912            HydroNode::Map { input, .. }
1913            | HydroNode::ResolveFutures { input, .. }
1914            | HydroNode::ResolveFuturesOrdered { input, .. }
1915            | HydroNode::FlatMap { input, .. }
1916            | HydroNode::Filter { input, .. }
1917            | HydroNode::FilterMap { input, .. }
1918            | HydroNode::Sort { input, .. }
1919            | HydroNode::DeferTick { input, .. }
1920            | HydroNode::Enumerate { input, .. }
1921            | HydroNode::Inspect { input, .. }
1922            | HydroNode::Unique { input, .. }
1923            | HydroNode::Network { input, .. }
1924            | HydroNode::Fold { input, .. }
1925            | HydroNode::Scan { input, .. }
1926            | HydroNode::FoldKeyed { input, .. }
1927            | HydroNode::Reduce { input, .. }
1928            | HydroNode::ReduceKeyed { input, .. }
1929            | HydroNode::Counter { input, .. } => {
1930                transform(input.as_mut(), seen_tees);
1931            }
1932        }
1933    }
1934
1935    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1936        match self {
1937            HydroNode::Placeholder => HydroNode::Placeholder,
1938            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1939                inner: Box::new(inner.deep_clone(seen_tees)),
1940                metadata: metadata.clone(),
1941            },
1942            HydroNode::ObserveNonDet {
1943                inner,
1944                trusted,
1945                metadata,
1946            } => HydroNode::ObserveNonDet {
1947                inner: Box::new(inner.deep_clone(seen_tees)),
1948                trusted: *trusted,
1949                metadata: metadata.clone(),
1950            },
1951            HydroNode::Source { source, metadata } => HydroNode::Source {
1952                source: source.clone(),
1953                metadata: metadata.clone(),
1954            },
1955            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1956                value: value.clone(),
1957                metadata: metadata.clone(),
1958            },
1959            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1960                ident: ident.clone(),
1961                metadata: metadata.clone(),
1962            },
1963            HydroNode::Tee { inner, metadata } => {
1964                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1965                    HydroNode::Tee {
1966                        inner: TeeNode(transformed.clone()),
1967                        metadata: metadata.clone(),
1968                    }
1969                } else {
1970                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1971                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1972                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1973                    *new_rc.borrow_mut() = cloned;
1974                    HydroNode::Tee {
1975                        inner: TeeNode(new_rc),
1976                        metadata: metadata.clone(),
1977                    }
1978                }
1979            }
1980            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1981                inner: Box::new(inner.deep_clone(seen_tees)),
1982                metadata: metadata.clone(),
1983            },
1984            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1985                inner: Box::new(inner.deep_clone(seen_tees)),
1986                metadata: metadata.clone(),
1987            },
1988            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1989                inner: Box::new(inner.deep_clone(seen_tees)),
1990                metadata: metadata.clone(),
1991            },
1992            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1993                inner: Box::new(inner.deep_clone(seen_tees)),
1994                metadata: metadata.clone(),
1995            },
1996            HydroNode::Chain {
1997                first,
1998                second,
1999                metadata,
2000            } => HydroNode::Chain {
2001                first: Box::new(first.deep_clone(seen_tees)),
2002                second: Box::new(second.deep_clone(seen_tees)),
2003                metadata: metadata.clone(),
2004            },
2005            HydroNode::ChainFirst {
2006                first,
2007                second,
2008                metadata,
2009            } => HydroNode::ChainFirst {
2010                first: Box::new(first.deep_clone(seen_tees)),
2011                second: Box::new(second.deep_clone(seen_tees)),
2012                metadata: metadata.clone(),
2013            },
2014            HydroNode::CrossProduct {
2015                left,
2016                right,
2017                metadata,
2018            } => HydroNode::CrossProduct {
2019                left: Box::new(left.deep_clone(seen_tees)),
2020                right: Box::new(right.deep_clone(seen_tees)),
2021                metadata: metadata.clone(),
2022            },
2023            HydroNode::CrossSingleton {
2024                left,
2025                right,
2026                metadata,
2027            } => HydroNode::CrossSingleton {
2028                left: Box::new(left.deep_clone(seen_tees)),
2029                right: Box::new(right.deep_clone(seen_tees)),
2030                metadata: metadata.clone(),
2031            },
2032            HydroNode::Join {
2033                left,
2034                right,
2035                metadata,
2036            } => HydroNode::Join {
2037                left: Box::new(left.deep_clone(seen_tees)),
2038                right: Box::new(right.deep_clone(seen_tees)),
2039                metadata: metadata.clone(),
2040            },
2041            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2042                pos: Box::new(pos.deep_clone(seen_tees)),
2043                neg: Box::new(neg.deep_clone(seen_tees)),
2044                metadata: metadata.clone(),
2045            },
2046            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2047                pos: Box::new(pos.deep_clone(seen_tees)),
2048                neg: Box::new(neg.deep_clone(seen_tees)),
2049                metadata: metadata.clone(),
2050            },
2051            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2052                input: Box::new(input.deep_clone(seen_tees)),
2053                metadata: metadata.clone(),
2054            },
2055            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2056                HydroNode::ResolveFuturesOrdered {
2057                    input: Box::new(input.deep_clone(seen_tees)),
2058                    metadata: metadata.clone(),
2059                }
2060            }
2061            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2062                f: f.clone(),
2063                input: Box::new(input.deep_clone(seen_tees)),
2064                metadata: metadata.clone(),
2065            },
2066            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2067                f: f.clone(),
2068                input: Box::new(input.deep_clone(seen_tees)),
2069                metadata: metadata.clone(),
2070            },
2071            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2072                f: f.clone(),
2073                input: Box::new(input.deep_clone(seen_tees)),
2074                metadata: metadata.clone(),
2075            },
2076            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2077                f: f.clone(),
2078                input: Box::new(input.deep_clone(seen_tees)),
2079                metadata: metadata.clone(),
2080            },
2081            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2082                input: Box::new(input.deep_clone(seen_tees)),
2083                metadata: metadata.clone(),
2084            },
2085            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2086                input: Box::new(input.deep_clone(seen_tees)),
2087                metadata: metadata.clone(),
2088            },
2089            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2090                f: f.clone(),
2091                input: Box::new(input.deep_clone(seen_tees)),
2092                metadata: metadata.clone(),
2093            },
2094            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2095                input: Box::new(input.deep_clone(seen_tees)),
2096                metadata: metadata.clone(),
2097            },
2098            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2099                input: Box::new(input.deep_clone(seen_tees)),
2100                metadata: metadata.clone(),
2101            },
2102            HydroNode::Fold {
2103                init,
2104                acc,
2105                input,
2106                metadata,
2107            } => HydroNode::Fold {
2108                init: init.clone(),
2109                acc: acc.clone(),
2110                input: Box::new(input.deep_clone(seen_tees)),
2111                metadata: metadata.clone(),
2112            },
2113            HydroNode::Scan {
2114                init,
2115                acc,
2116                input,
2117                metadata,
2118            } => HydroNode::Scan {
2119                init: init.clone(),
2120                acc: acc.clone(),
2121                input: Box::new(input.deep_clone(seen_tees)),
2122                metadata: metadata.clone(),
2123            },
2124            HydroNode::FoldKeyed {
2125                init,
2126                acc,
2127                input,
2128                metadata,
2129            } => HydroNode::FoldKeyed {
2130                init: init.clone(),
2131                acc: acc.clone(),
2132                input: Box::new(input.deep_clone(seen_tees)),
2133                metadata: metadata.clone(),
2134            },
2135            HydroNode::ReduceKeyedWatermark {
2136                f,
2137                input,
2138                watermark,
2139                metadata,
2140            } => HydroNode::ReduceKeyedWatermark {
2141                f: f.clone(),
2142                input: Box::new(input.deep_clone(seen_tees)),
2143                watermark: Box::new(watermark.deep_clone(seen_tees)),
2144                metadata: metadata.clone(),
2145            },
2146            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2147                f: f.clone(),
2148                input: Box::new(input.deep_clone(seen_tees)),
2149                metadata: metadata.clone(),
2150            },
2151            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2152                f: f.clone(),
2153                input: Box::new(input.deep_clone(seen_tees)),
2154                metadata: metadata.clone(),
2155            },
2156            HydroNode::Network {
2157                name,
2158                serialize_fn,
2159                instantiate_fn,
2160                deserialize_fn,
2161                input,
2162                metadata,
2163            } => HydroNode::Network {
2164                name: name.clone(),
2165                serialize_fn: serialize_fn.clone(),
2166                instantiate_fn: instantiate_fn.clone(),
2167                deserialize_fn: deserialize_fn.clone(),
2168                input: Box::new(input.deep_clone(seen_tees)),
2169                metadata: metadata.clone(),
2170            },
2171            HydroNode::ExternalInput {
2172                from_external_key,
2173                from_port_id,
2174                from_many,
2175                codec_type,
2176                port_hint,
2177                instantiate_fn,
2178                deserialize_fn,
2179                metadata,
2180            } => HydroNode::ExternalInput {
2181                from_external_key: *from_external_key,
2182                from_port_id: *from_port_id,
2183                from_many: *from_many,
2184                codec_type: codec_type.clone(),
2185                port_hint: *port_hint,
2186                instantiate_fn: instantiate_fn.clone(),
2187                deserialize_fn: deserialize_fn.clone(),
2188                metadata: metadata.clone(),
2189            },
2190            HydroNode::Counter {
2191                tag,
2192                duration,
2193                prefix,
2194                input,
2195                metadata,
2196            } => HydroNode::Counter {
2197                tag: tag.clone(),
2198                duration: duration.clone(),
2199                prefix: prefix.clone(),
2200                input: Box::new(input.deep_clone(seen_tees)),
2201                metadata: metadata.clone(),
2202            },
2203        }
2204    }
2205
2206    #[cfg(feature = "build")]
2207    pub fn emit_core<'a, D: Deploy<'a>>(
2208        &mut self,
2209        builders_or_callback: &mut BuildersOrCallback<
2210            impl FnMut(&mut HydroRoot, &mut usize),
2211            impl FnMut(&mut HydroNode, &mut usize),
2212        >,
2213        seen_tees: &mut SeenTees,
2214        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2215        next_stmt_id: &mut usize,
2216    ) -> syn::Ident {
2217        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2218
2219        self.transform_bottom_up(
2220            &mut |node: &mut HydroNode| {
2221                let out_location = node.metadata().location_id.clone();
2222                match node {
2223                    HydroNode::Placeholder => {
2224                        panic!()
2225                    }
2226
2227                    HydroNode::Cast { .. } => {
2228                        // Cast passes through the input ident unchanged
2229                        // The input ident is already on the stack from processing the child
2230                        match builders_or_callback {
2231                            BuildersOrCallback::Builders(_) => {}
2232                            BuildersOrCallback::Callback(_, node_callback) => {
2233                                node_callback(node, next_stmt_id);
2234                            }
2235                        }
2236
2237                        *next_stmt_id += 1;
2238                        // input_ident stays on stack as output
2239                    }
2240
2241                    HydroNode::ObserveNonDet {
2242                        inner,
2243                        trusted,
2244                        metadata,
2245                        ..
2246                    } => {
2247                        let inner_ident = ident_stack.pop().unwrap();
2248
2249                        let observe_ident =
2250                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2251
2252                        match builders_or_callback {
2253                            BuildersOrCallback::Builders(graph_builders) => {
2254                                graph_builders.observe_nondet(
2255                                    *trusted,
2256                                    &inner.metadata().location_id,
2257                                    inner_ident,
2258                                    &inner.metadata().collection_kind,
2259                                    &observe_ident,
2260                                    &metadata.collection_kind,
2261                                    &metadata.op,
2262                                );
2263                            }
2264                            BuildersOrCallback::Callback(_, node_callback) => {
2265                                node_callback(node, next_stmt_id);
2266                            }
2267                        }
2268
2269                        *next_stmt_id += 1;
2270
2271                        ident_stack.push(observe_ident);
2272                    }
2273
2274                    HydroNode::Batch {
2275                        inner, metadata, ..
2276                    } => {
2277                        let inner_ident = ident_stack.pop().unwrap();
2278
2279                        let batch_ident =
2280                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2281
2282                        match builders_or_callback {
2283                            BuildersOrCallback::Builders(graph_builders) => {
2284                                graph_builders.batch(
2285                                    inner_ident,
2286                                    &inner.metadata().location_id,
2287                                    &inner.metadata().collection_kind,
2288                                    &batch_ident,
2289                                    &out_location,
2290                                    &metadata.op,
2291                                );
2292                            }
2293                            BuildersOrCallback::Callback(_, node_callback) => {
2294                                node_callback(node, next_stmt_id);
2295                            }
2296                        }
2297
2298                        *next_stmt_id += 1;
2299
2300                        ident_stack.push(batch_ident);
2301                    }
2302
2303                    HydroNode::YieldConcat { inner, .. } => {
2304                        let inner_ident = ident_stack.pop().unwrap();
2305
2306                        let yield_ident =
2307                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2308
2309                        match builders_or_callback {
2310                            BuildersOrCallback::Builders(graph_builders) => {
2311                                graph_builders.yield_from_tick(
2312                                    inner_ident,
2313                                    &inner.metadata().location_id,
2314                                    &inner.metadata().collection_kind,
2315                                    &yield_ident,
2316                                    &out_location,
2317                                );
2318                            }
2319                            BuildersOrCallback::Callback(_, node_callback) => {
2320                                node_callback(node, next_stmt_id);
2321                            }
2322                        }
2323
2324                        *next_stmt_id += 1;
2325
2326                        ident_stack.push(yield_ident);
2327                    }
2328
2329                    HydroNode::BeginAtomic { inner, metadata } => {
2330                        let inner_ident = ident_stack.pop().unwrap();
2331
2332                        let begin_ident =
2333                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335                        match builders_or_callback {
2336                            BuildersOrCallback::Builders(graph_builders) => {
2337                                graph_builders.begin_atomic(
2338                                    inner_ident,
2339                                    &inner.metadata().location_id,
2340                                    &inner.metadata().collection_kind,
2341                                    &begin_ident,
2342                                    &out_location,
2343                                    &metadata.op,
2344                                );
2345                            }
2346                            BuildersOrCallback::Callback(_, node_callback) => {
2347                                node_callback(node, next_stmt_id);
2348                            }
2349                        }
2350
2351                        *next_stmt_id += 1;
2352
2353                        ident_stack.push(begin_ident);
2354                    }
2355
2356                    HydroNode::EndAtomic { inner, .. } => {
2357                        let inner_ident = ident_stack.pop().unwrap();
2358
2359                        let end_ident =
2360                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2361
2362                        match builders_or_callback {
2363                            BuildersOrCallback::Builders(graph_builders) => {
2364                                graph_builders.end_atomic(
2365                                    inner_ident,
2366                                    &inner.metadata().location_id,
2367                                    &inner.metadata().collection_kind,
2368                                    &end_ident,
2369                                );
2370                            }
2371                            BuildersOrCallback::Callback(_, node_callback) => {
2372                                node_callback(node, next_stmt_id);
2373                            }
2374                        }
2375
2376                        *next_stmt_id += 1;
2377
2378                        ident_stack.push(end_ident);
2379                    }
2380
2381                    HydroNode::Source {
2382                        source, metadata, ..
2383                    } => {
2384                        if let HydroSource::ExternalNetwork() = source {
2385                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2386                        } else {
2387                            let source_ident =
2388                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2389
2390                            let source_stmt = match source {
2391                                HydroSource::Stream(expr) => {
2392                                    debug_assert!(metadata.location_id.is_top_level());
2393                                    parse_quote! {
2394                                        #source_ident = source_stream(#expr);
2395                                    }
2396                                }
2397
2398                                HydroSource::ExternalNetwork() => {
2399                                    unreachable!()
2400                                }
2401
2402                                HydroSource::Iter(expr) => {
2403                                    if metadata.location_id.is_top_level() {
2404                                        parse_quote! {
2405                                            #source_ident = source_iter(#expr);
2406                                        }
2407                                    } else {
2408                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2409                                        parse_quote! {
2410                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2411                                        }
2412                                    }
2413                                }
2414
2415                                HydroSource::Spin() => {
2416                                    debug_assert!(metadata.location_id.is_top_level());
2417                                    parse_quote! {
2418                                        #source_ident = spin();
2419                                    }
2420                                }
2421
2422                                HydroSource::ClusterMembers(location_id) => {
2423                                    debug_assert!(metadata.location_id.is_top_level());
2424
2425                                    let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2426                                        D::cluster_membership_stream(location_id),
2427                                        &(),
2428                                    );
2429
2430                                    parse_quote! {
2431                                        #source_ident = source_stream(#expr);
2432                                    }
2433                                }
2434
2435                                HydroSource::Embedded(ident) => {
2436                                    parse_quote! {
2437                                        #source_ident = source_stream(#ident);
2438                                    }
2439                                }
2440                            };
2441
2442                            match builders_or_callback {
2443                                BuildersOrCallback::Builders(graph_builders) => {
2444                                    let builder = graph_builders.get_dfir_mut(&out_location);
2445                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2446                                }
2447                                BuildersOrCallback::Callback(_, node_callback) => {
2448                                    node_callback(node, next_stmt_id);
2449                                }
2450                            }
2451
2452                            *next_stmt_id += 1;
2453
2454                            ident_stack.push(source_ident);
2455                        }
2456                    }
2457
2458                    HydroNode::SingletonSource { value, metadata } => {
2459                        let source_ident =
2460                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2461
2462                        match builders_or_callback {
2463                            BuildersOrCallback::Builders(graph_builders) => {
2464                                let builder = graph_builders.get_dfir_mut(&out_location);
2465
2466                                if metadata.location_id.is_top_level()
2467                                    && metadata.collection_kind.is_bounded()
2468                                {
2469                                    builder.add_dfir(
2470                                        parse_quote! {
2471                                            #source_ident = source_iter([#value]);
2472                                        },
2473                                        None,
2474                                        Some(&next_stmt_id.to_string()),
2475                                    );
2476                                } else {
2477                                    builder.add_dfir(
2478                                        parse_quote! {
2479                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2480                                        },
2481                                        None,
2482                                        Some(&next_stmt_id.to_string()),
2483                                    );
2484                                }
2485                            }
2486                            BuildersOrCallback::Callback(_, node_callback) => {
2487                                node_callback(node, next_stmt_id);
2488                            }
2489                        }
2490
2491                        *next_stmt_id += 1;
2492
2493                        ident_stack.push(source_ident);
2494                    }
2495
2496                    HydroNode::CycleSource { ident, .. } => {
2497                        let ident = ident.clone();
2498
2499                        match builders_or_callback {
2500                            BuildersOrCallback::Builders(_) => {}
2501                            BuildersOrCallback::Callback(_, node_callback) => {
2502                                node_callback(node, next_stmt_id);
2503                            }
2504                        }
2505
2506                        // consume a stmt id even though we did not emit anything so that we can instrument this
2507                        *next_stmt_id += 1;
2508
2509                        ident_stack.push(ident);
2510                    }
2511
2512                    HydroNode::Tee { inner, .. } => {
2513                        let ret_ident = if let Some(teed_from) =
2514                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2515                        {
2516                            match builders_or_callback {
2517                                BuildersOrCallback::Builders(_) => {}
2518                                BuildersOrCallback::Callback(_, node_callback) => {
2519                                    node_callback(node, next_stmt_id);
2520                                }
2521                            }
2522
2523                            teed_from.clone()
2524                        } else {
2525                            // The inner node was already processed by transform_bottom_up,
2526                            // so its ident is on the stack
2527                            let inner_ident = ident_stack.pop().unwrap();
2528
2529                            let tee_ident =
2530                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2531
2532                            built_tees.insert(
2533                                inner.0.as_ref() as *const RefCell<HydroNode>,
2534                                tee_ident.clone(),
2535                            );
2536
2537                            match builders_or_callback {
2538                                BuildersOrCallback::Builders(graph_builders) => {
2539                                    let builder = graph_builders.get_dfir_mut(&out_location);
2540                                    builder.add_dfir(
2541                                        parse_quote! {
2542                                            #tee_ident = #inner_ident -> tee();
2543                                        },
2544                                        None,
2545                                        Some(&next_stmt_id.to_string()),
2546                                    );
2547                                }
2548                                BuildersOrCallback::Callback(_, node_callback) => {
2549                                    node_callback(node, next_stmt_id);
2550                                }
2551                            }
2552
2553                            tee_ident
2554                        };
2555
2556                        // we consume a stmt id regardless of if we emit the tee() operator,
2557                        // so that during rewrites we touch all recipients of the tee()
2558
2559                        *next_stmt_id += 1;
2560                        ident_stack.push(ret_ident);
2561                    }
2562
2563                    HydroNode::Chain { .. } => {
2564                        // Children are processed left-to-right, so second is on top
2565                        let second_ident = ident_stack.pop().unwrap();
2566                        let first_ident = ident_stack.pop().unwrap();
2567
2568                        let chain_ident =
2569                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2570
2571                        match builders_or_callback {
2572                            BuildersOrCallback::Builders(graph_builders) => {
2573                                let builder = graph_builders.get_dfir_mut(&out_location);
2574                                builder.add_dfir(
2575                                    parse_quote! {
2576                                        #chain_ident = chain();
2577                                        #first_ident -> [0]#chain_ident;
2578                                        #second_ident -> [1]#chain_ident;
2579                                    },
2580                                    None,
2581                                    Some(&next_stmt_id.to_string()),
2582                                );
2583                            }
2584                            BuildersOrCallback::Callback(_, node_callback) => {
2585                                node_callback(node, next_stmt_id);
2586                            }
2587                        }
2588
2589                        *next_stmt_id += 1;
2590
2591                        ident_stack.push(chain_ident);
2592                    }
2593
2594                    HydroNode::ChainFirst { .. } => {
2595                        let second_ident = ident_stack.pop().unwrap();
2596                        let first_ident = ident_stack.pop().unwrap();
2597
2598                        let chain_ident =
2599                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601                        match builders_or_callback {
2602                            BuildersOrCallback::Builders(graph_builders) => {
2603                                let builder = graph_builders.get_dfir_mut(&out_location);
2604                                builder.add_dfir(
2605                                    parse_quote! {
2606                                        #chain_ident = chain_first_n(1);
2607                                        #first_ident -> [0]#chain_ident;
2608                                        #second_ident -> [1]#chain_ident;
2609                                    },
2610                                    None,
2611                                    Some(&next_stmt_id.to_string()),
2612                                );
2613                            }
2614                            BuildersOrCallback::Callback(_, node_callback) => {
2615                                node_callback(node, next_stmt_id);
2616                            }
2617                        }
2618
2619                        *next_stmt_id += 1;
2620
2621                        ident_stack.push(chain_ident);
2622                    }
2623
2624                    HydroNode::CrossSingleton { right, .. } => {
2625                        let right_ident = ident_stack.pop().unwrap();
2626                        let left_ident = ident_stack.pop().unwrap();
2627
2628                        let cross_ident =
2629                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2630
2631                        match builders_or_callback {
2632                            BuildersOrCallback::Builders(graph_builders) => {
2633                                let builder = graph_builders.get_dfir_mut(&out_location);
2634
2635                                if right.metadata().location_id.is_top_level()
2636                                    && right.metadata().collection_kind.is_bounded()
2637                                {
2638                                    builder.add_dfir(
2639                                        parse_quote! {
2640                                            #cross_ident = cross_singleton();
2641                                            #left_ident -> [input]#cross_ident;
2642                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2643                                        },
2644                                        None,
2645                                        Some(&next_stmt_id.to_string()),
2646                                    );
2647                                } else {
2648                                    builder.add_dfir(
2649                                        parse_quote! {
2650                                            #cross_ident = cross_singleton();
2651                                            #left_ident -> [input]#cross_ident;
2652                                            #right_ident -> [single]#cross_ident;
2653                                        },
2654                                        None,
2655                                        Some(&next_stmt_id.to_string()),
2656                                    );
2657                                }
2658                            }
2659                            BuildersOrCallback::Callback(_, node_callback) => {
2660                                node_callback(node, next_stmt_id);
2661                            }
2662                        }
2663
2664                        *next_stmt_id += 1;
2665
2666                        ident_stack.push(cross_ident);
2667                    }
2668
2669                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2670                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2671                            parse_quote!(cross_join_multiset)
2672                        } else {
2673                            parse_quote!(join_multiset)
2674                        };
2675
2676                        let (HydroNode::CrossProduct { left, right, .. }
2677                        | HydroNode::Join { left, right, .. }) = node
2678                        else {
2679                            unreachable!()
2680                        };
2681
2682                        let is_top_level = left.metadata().location_id.is_top_level()
2683                            && right.metadata().location_id.is_top_level();
2684                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2685                            quote!('static)
2686                        } else {
2687                            quote!('tick)
2688                        };
2689
2690                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2691                            quote!('static)
2692                        } else {
2693                            quote!('tick)
2694                        };
2695
2696                        let right_ident = ident_stack.pop().unwrap();
2697                        let left_ident = ident_stack.pop().unwrap();
2698
2699                        let stream_ident =
2700                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2701
2702                        match builders_or_callback {
2703                            BuildersOrCallback::Builders(graph_builders) => {
2704                                let builder = graph_builders.get_dfir_mut(&out_location);
2705                                builder.add_dfir(
2706                                    if is_top_level {
2707                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2708                                        // a multiset_delta() to negate the replay behavior
2709                                        parse_quote! {
2710                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2711                                            #left_ident -> [0]#stream_ident;
2712                                            #right_ident -> [1]#stream_ident;
2713                                        }
2714                                    } else {
2715                                        parse_quote! {
2716                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2717                                            #left_ident -> [0]#stream_ident;
2718                                            #right_ident -> [1]#stream_ident;
2719                                        }
2720                                    }
2721                                    ,
2722                                    None,
2723                                    Some(&next_stmt_id.to_string()),
2724                                );
2725                            }
2726                            BuildersOrCallback::Callback(_, node_callback) => {
2727                                node_callback(node, next_stmt_id);
2728                            }
2729                        }
2730
2731                        *next_stmt_id += 1;
2732
2733                        ident_stack.push(stream_ident);
2734                    }
2735
2736                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2737                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2738                            parse_quote!(difference)
2739                        } else {
2740                            parse_quote!(anti_join)
2741                        };
2742
2743                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2744                            node
2745                        else {
2746                            unreachable!()
2747                        };
2748
2749                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2750                            quote!('static)
2751                        } else {
2752                            quote!('tick)
2753                        };
2754
2755                        let neg_ident = ident_stack.pop().unwrap();
2756                        let pos_ident = ident_stack.pop().unwrap();
2757
2758                        let stream_ident =
2759                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2760
2761                        match builders_or_callback {
2762                            BuildersOrCallback::Builders(graph_builders) => {
2763                                let builder = graph_builders.get_dfir_mut(&out_location);
2764                                builder.add_dfir(
2765                                    parse_quote! {
2766                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2767                                        #pos_ident -> [pos]#stream_ident;
2768                                        #neg_ident -> [neg]#stream_ident;
2769                                    },
2770                                    None,
2771                                    Some(&next_stmt_id.to_string()),
2772                                );
2773                            }
2774                            BuildersOrCallback::Callback(_, node_callback) => {
2775                                node_callback(node, next_stmt_id);
2776                            }
2777                        }
2778
2779                        *next_stmt_id += 1;
2780
2781                        ident_stack.push(stream_ident);
2782                    }
2783
2784                    HydroNode::ResolveFutures { .. } => {
2785                        let input_ident = ident_stack.pop().unwrap();
2786
2787                        let futures_ident =
2788                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2789
2790                        match builders_or_callback {
2791                            BuildersOrCallback::Builders(graph_builders) => {
2792                                let builder = graph_builders.get_dfir_mut(&out_location);
2793                                builder.add_dfir(
2794                                    parse_quote! {
2795                                        #futures_ident = #input_ident -> resolve_futures();
2796                                    },
2797                                    None,
2798                                    Some(&next_stmt_id.to_string()),
2799                                );
2800                            }
2801                            BuildersOrCallback::Callback(_, node_callback) => {
2802                                node_callback(node, next_stmt_id);
2803                            }
2804                        }
2805
2806                        *next_stmt_id += 1;
2807
2808                        ident_stack.push(futures_ident);
2809                    }
2810
2811                    HydroNode::ResolveFuturesOrdered { .. } => {
2812                        let input_ident = ident_stack.pop().unwrap();
2813
2814                        let futures_ident =
2815                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2816
2817                        match builders_or_callback {
2818                            BuildersOrCallback::Builders(graph_builders) => {
2819                                let builder = graph_builders.get_dfir_mut(&out_location);
2820                                builder.add_dfir(
2821                                    parse_quote! {
2822                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2823                                    },
2824                                    None,
2825                                    Some(&next_stmt_id.to_string()),
2826                                );
2827                            }
2828                            BuildersOrCallback::Callback(_, node_callback) => {
2829                                node_callback(node, next_stmt_id);
2830                            }
2831                        }
2832
2833                        *next_stmt_id += 1;
2834
2835                        ident_stack.push(futures_ident);
2836                    }
2837
2838                    HydroNode::Map { f, .. } => {
2839                        let input_ident = ident_stack.pop().unwrap();
2840
2841                        let map_ident =
2842                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2843
2844                        match builders_or_callback {
2845                            BuildersOrCallback::Builders(graph_builders) => {
2846                                let builder = graph_builders.get_dfir_mut(&out_location);
2847                                builder.add_dfir(
2848                                    parse_quote! {
2849                                        #map_ident = #input_ident -> map(#f);
2850                                    },
2851                                    None,
2852                                    Some(&next_stmt_id.to_string()),
2853                                );
2854                            }
2855                            BuildersOrCallback::Callback(_, node_callback) => {
2856                                node_callback(node, next_stmt_id);
2857                            }
2858                        }
2859
2860                        *next_stmt_id += 1;
2861
2862                        ident_stack.push(map_ident);
2863                    }
2864
2865                    HydroNode::FlatMap { f, .. } => {
2866                        let input_ident = ident_stack.pop().unwrap();
2867
2868                        let flat_map_ident =
2869                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2870
2871                        match builders_or_callback {
2872                            BuildersOrCallback::Builders(graph_builders) => {
2873                                let builder = graph_builders.get_dfir_mut(&out_location);
2874                                builder.add_dfir(
2875                                    parse_quote! {
2876                                        #flat_map_ident = #input_ident -> flat_map(#f);
2877                                    },
2878                                    None,
2879                                    Some(&next_stmt_id.to_string()),
2880                                );
2881                            }
2882                            BuildersOrCallback::Callback(_, node_callback) => {
2883                                node_callback(node, next_stmt_id);
2884                            }
2885                        }
2886
2887                        *next_stmt_id += 1;
2888
2889                        ident_stack.push(flat_map_ident);
2890                    }
2891
2892                    HydroNode::Filter { f, .. } => {
2893                        let input_ident = ident_stack.pop().unwrap();
2894
2895                        let filter_ident =
2896                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2897
2898                        match builders_or_callback {
2899                            BuildersOrCallback::Builders(graph_builders) => {
2900                                let builder = graph_builders.get_dfir_mut(&out_location);
2901                                builder.add_dfir(
2902                                    parse_quote! {
2903                                        #filter_ident = #input_ident -> filter(#f);
2904                                    },
2905                                    None,
2906                                    Some(&next_stmt_id.to_string()),
2907                                );
2908                            }
2909                            BuildersOrCallback::Callback(_, node_callback) => {
2910                                node_callback(node, next_stmt_id);
2911                            }
2912                        }
2913
2914                        *next_stmt_id += 1;
2915
2916                        ident_stack.push(filter_ident);
2917                    }
2918
2919                    HydroNode::FilterMap { f, .. } => {
2920                        let input_ident = ident_stack.pop().unwrap();
2921
2922                        let filter_map_ident =
2923                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2924
2925                        match builders_or_callback {
2926                            BuildersOrCallback::Builders(graph_builders) => {
2927                                let builder = graph_builders.get_dfir_mut(&out_location);
2928                                builder.add_dfir(
2929                                    parse_quote! {
2930                                        #filter_map_ident = #input_ident -> filter_map(#f);
2931                                    },
2932                                    None,
2933                                    Some(&next_stmt_id.to_string()),
2934                                );
2935                            }
2936                            BuildersOrCallback::Callback(_, node_callback) => {
2937                                node_callback(node, next_stmt_id);
2938                            }
2939                        }
2940
2941                        *next_stmt_id += 1;
2942
2943                        ident_stack.push(filter_map_ident);
2944                    }
2945
2946                    HydroNode::Sort { .. } => {
2947                        let input_ident = ident_stack.pop().unwrap();
2948
2949                        let sort_ident =
2950                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2951
2952                        match builders_or_callback {
2953                            BuildersOrCallback::Builders(graph_builders) => {
2954                                let builder = graph_builders.get_dfir_mut(&out_location);
2955                                builder.add_dfir(
2956                                    parse_quote! {
2957                                        #sort_ident = #input_ident -> sort();
2958                                    },
2959                                    None,
2960                                    Some(&next_stmt_id.to_string()),
2961                                );
2962                            }
2963                            BuildersOrCallback::Callback(_, node_callback) => {
2964                                node_callback(node, next_stmt_id);
2965                            }
2966                        }
2967
2968                        *next_stmt_id += 1;
2969
2970                        ident_stack.push(sort_ident);
2971                    }
2972
2973                    HydroNode::DeferTick { .. } => {
2974                        let input_ident = ident_stack.pop().unwrap();
2975
2976                        let defer_tick_ident =
2977                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979                        match builders_or_callback {
2980                            BuildersOrCallback::Builders(graph_builders) => {
2981                                let builder = graph_builders.get_dfir_mut(&out_location);
2982                                builder.add_dfir(
2983                                    parse_quote! {
2984                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
2985                                    },
2986                                    None,
2987                                    Some(&next_stmt_id.to_string()),
2988                                );
2989                            }
2990                            BuildersOrCallback::Callback(_, node_callback) => {
2991                                node_callback(node, next_stmt_id);
2992                            }
2993                        }
2994
2995                        *next_stmt_id += 1;
2996
2997                        ident_stack.push(defer_tick_ident);
2998                    }
2999
3000                    HydroNode::Enumerate { input, .. } => {
3001                        let input_ident = ident_stack.pop().unwrap();
3002
3003                        let enumerate_ident =
3004                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3005
3006                        match builders_or_callback {
3007                            BuildersOrCallback::Builders(graph_builders) => {
3008                                let builder = graph_builders.get_dfir_mut(&out_location);
3009                                let lifetime = if input.metadata().location_id.is_top_level() {
3010                                    quote!('static)
3011                                } else {
3012                                    quote!('tick)
3013                                };
3014                                builder.add_dfir(
3015                                    parse_quote! {
3016                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3017                                    },
3018                                    None,
3019                                    Some(&next_stmt_id.to_string()),
3020                                );
3021                            }
3022                            BuildersOrCallback::Callback(_, node_callback) => {
3023                                node_callback(node, next_stmt_id);
3024                            }
3025                        }
3026
3027                        *next_stmt_id += 1;
3028
3029                        ident_stack.push(enumerate_ident);
3030                    }
3031
3032                    HydroNode::Inspect { f, .. } => {
3033                        let input_ident = ident_stack.pop().unwrap();
3034
3035                        let inspect_ident =
3036                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3037
3038                        match builders_or_callback {
3039                            BuildersOrCallback::Builders(graph_builders) => {
3040                                let builder = graph_builders.get_dfir_mut(&out_location);
3041                                builder.add_dfir(
3042                                    parse_quote! {
3043                                        #inspect_ident = #input_ident -> inspect(#f);
3044                                    },
3045                                    None,
3046                                    Some(&next_stmt_id.to_string()),
3047                                );
3048                            }
3049                            BuildersOrCallback::Callback(_, node_callback) => {
3050                                node_callback(node, next_stmt_id);
3051                            }
3052                        }
3053
3054                        *next_stmt_id += 1;
3055
3056                        ident_stack.push(inspect_ident);
3057                    }
3058
3059                    HydroNode::Unique { input, .. } => {
3060                        let input_ident = ident_stack.pop().unwrap();
3061
3062                        let unique_ident =
3063                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3064
3065                        match builders_or_callback {
3066                            BuildersOrCallback::Builders(graph_builders) => {
3067                                let builder = graph_builders.get_dfir_mut(&out_location);
3068                                let lifetime = if input.metadata().location_id.is_top_level() {
3069                                    quote!('static)
3070                                } else {
3071                                    quote!('tick)
3072                                };
3073
3074                                builder.add_dfir(
3075                                    parse_quote! {
3076                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3077                                    },
3078                                    None,
3079                                    Some(&next_stmt_id.to_string()),
3080                                );
3081                            }
3082                            BuildersOrCallback::Callback(_, node_callback) => {
3083                                node_callback(node, next_stmt_id);
3084                            }
3085                        }
3086
3087                        *next_stmt_id += 1;
3088
3089                        ident_stack.push(unique_ident);
3090                    }
3091
3092                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3093                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3094                            if input.metadata().location_id.is_top_level()
3095                                && input.metadata().collection_kind.is_bounded()
3096                            {
3097                                parse_quote!(fold_no_replay)
3098                            } else {
3099                                parse_quote!(fold)
3100                            }
3101                        } else if matches!(node, HydroNode::Scan { .. }) {
3102                            parse_quote!(scan)
3103                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3104                            if input.metadata().location_id.is_top_level()
3105                                && input.metadata().collection_kind.is_bounded()
3106                            {
3107                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3108                            } else {
3109                                parse_quote!(fold_keyed)
3110                            }
3111                        } else {
3112                            unreachable!()
3113                        };
3114
3115                        let (HydroNode::Fold { input, .. }
3116                        | HydroNode::FoldKeyed { input, .. }
3117                        | HydroNode::Scan { input, .. }) = node
3118                        else {
3119                            unreachable!()
3120                        };
3121
3122                        let lifetime = if input.metadata().location_id.is_top_level() {
3123                            quote!('static)
3124                        } else {
3125                            quote!('tick)
3126                        };
3127
3128                        let input_ident = ident_stack.pop().unwrap();
3129
3130                        let (HydroNode::Fold { init, acc, .. }
3131                        | HydroNode::FoldKeyed { init, acc, .. }
3132                        | HydroNode::Scan { init, acc, .. }) = &*node
3133                        else {
3134                            unreachable!()
3135                        };
3136
3137                        let fold_ident =
3138                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3139
3140                        match builders_or_callback {
3141                            BuildersOrCallback::Builders(graph_builders) => {
3142                                if matches!(node, HydroNode::Fold { .. })
3143                                    && node.metadata().location_id.is_top_level()
3144                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3145                                    && graph_builders.singleton_intermediates()
3146                                    && !node.metadata().collection_kind.is_bounded()
3147                                {
3148                                    let builder = graph_builders.get_dfir_mut(&out_location);
3149
3150                                    let acc: syn::Expr = parse_quote!({
3151                                        let mut __inner = #acc;
3152                                        move |__state, __value| {
3153                                            __inner(__state, __value);
3154                                            Some(__state.clone())
3155                                        }
3156                                    });
3157
3158                                    builder.add_dfir(
3159                                        parse_quote! {
3160                                            source_iter([(#init)()]) -> [0]#fold_ident;
3161                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3162                                            #fold_ident = chain();
3163                                        },
3164                                        None,
3165                                        Some(&next_stmt_id.to_string()),
3166                                    );
3167                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3168                                    && node.metadata().location_id.is_top_level()
3169                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3170                                    && graph_builders.singleton_intermediates()
3171                                    && !node.metadata().collection_kind.is_bounded()
3172                                {
3173                                    let builder = graph_builders.get_dfir_mut(&out_location);
3174
3175                                    let acc: syn::Expr = parse_quote!({
3176                                        let mut __init = #init;
3177                                        let mut __inner = #acc;
3178                                        move |__state, __kv: (_, _)| {
3179                                            // TODO(shadaj): we can avoid the clone when the entry exists
3180                                            let __state = __state
3181                                                .entry(::std::clone::Clone::clone(&__kv.0))
3182                                                .or_insert_with(|| (__init)());
3183                                            __inner(__state, __kv.1);
3184                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3185                                        }
3186                                    });
3187
3188                                    builder.add_dfir(
3189                                        parse_quote! {
3190                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3191                                        },
3192                                        None,
3193                                        Some(&next_stmt_id.to_string()),
3194                                    );
3195                                } else {
3196                                    let builder = graph_builders.get_dfir_mut(&out_location);
3197                                    builder.add_dfir(
3198                                        parse_quote! {
3199                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3200                                        },
3201                                        None,
3202                                        Some(&next_stmt_id.to_string()),
3203                                    );
3204                                }
3205                            }
3206                            BuildersOrCallback::Callback(_, node_callback) => {
3207                                node_callback(node, next_stmt_id);
3208                            }
3209                        }
3210
3211                        *next_stmt_id += 1;
3212
3213                        ident_stack.push(fold_ident);
3214                    }
3215
3216                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3217                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3218                            if input.metadata().location_id.is_top_level()
3219                                && input.metadata().collection_kind.is_bounded()
3220                            {
3221                                parse_quote!(reduce_no_replay)
3222                            } else {
3223                                parse_quote!(reduce)
3224                            }
3225                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3226                            if input.metadata().location_id.is_top_level()
3227                                && input.metadata().collection_kind.is_bounded()
3228                            {
3229                                todo!(
3230                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3231                                )
3232                            } else {
3233                                parse_quote!(reduce_keyed)
3234                            }
3235                        } else {
3236                            unreachable!()
3237                        };
3238
3239                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3240                        else {
3241                            unreachable!()
3242                        };
3243
3244                        let lifetime = if input.metadata().location_id.is_top_level() {
3245                            quote!('static)
3246                        } else {
3247                            quote!('tick)
3248                        };
3249
3250                        let input_ident = ident_stack.pop().unwrap();
3251
3252                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3253                        else {
3254                            unreachable!()
3255                        };
3256
3257                        let reduce_ident =
3258                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3259
3260                        match builders_or_callback {
3261                            BuildersOrCallback::Builders(graph_builders) => {
3262                                if matches!(node, HydroNode::Reduce { .. })
3263                                    && node.metadata().location_id.is_top_level()
3264                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3265                                    && graph_builders.singleton_intermediates()
3266                                    && !node.metadata().collection_kind.is_bounded()
3267                                {
3268                                    todo!(
3269                                        "Reduce with optional intermediates is not yet supported in simulator"
3270                                    );
3271                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3272                                    && node.metadata().location_id.is_top_level()
3273                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3274                                    && graph_builders.singleton_intermediates()
3275                                    && !node.metadata().collection_kind.is_bounded()
3276                                {
3277                                    todo!(
3278                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3279                                    );
3280                                } else {
3281                                    let builder = graph_builders.get_dfir_mut(&out_location);
3282                                    builder.add_dfir(
3283                                        parse_quote! {
3284                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3285                                        },
3286                                        None,
3287                                        Some(&next_stmt_id.to_string()),
3288                                    );
3289                                }
3290                            }
3291                            BuildersOrCallback::Callback(_, node_callback) => {
3292                                node_callback(node, next_stmt_id);
3293                            }
3294                        }
3295
3296                        *next_stmt_id += 1;
3297
3298                        ident_stack.push(reduce_ident);
3299                    }
3300
3301                    HydroNode::ReduceKeyedWatermark {
3302                        f,
3303                        input,
3304                        metadata,
3305                        ..
3306                    } => {
3307                        let lifetime = if input.metadata().location_id.is_top_level() {
3308                            quote!('static)
3309                        } else {
3310                            quote!('tick)
3311                        };
3312
3313                        // watermark is processed second, so it's on top
3314                        let watermark_ident = ident_stack.pop().unwrap();
3315                        let input_ident = ident_stack.pop().unwrap();
3316
3317                        let chain_ident = syn::Ident::new(
3318                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3319                            Span::call_site(),
3320                        );
3321
3322                        let fold_ident =
3323                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3324
3325                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3326                            && input.metadata().collection_kind.is_bounded()
3327                        {
3328                            parse_quote!(fold_no_replay)
3329                        } else {
3330                            parse_quote!(fold)
3331                        };
3332
3333                        match builders_or_callback {
3334                            BuildersOrCallback::Builders(graph_builders) => {
3335                                if metadata.location_id.is_top_level()
3336                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3337                                    && graph_builders.singleton_intermediates()
3338                                    && !metadata.collection_kind.is_bounded()
3339                                {
3340                                    todo!(
3341                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3342                                    )
3343                                } else {
3344                                    let builder = graph_builders.get_dfir_mut(&out_location);
3345                                    builder.add_dfir(
3346                                        parse_quote! {
3347                                            #chain_ident = chain();
3348                                            #input_ident
3349                                                -> map(|x| (Some(x), None))
3350                                                -> [0]#chain_ident;
3351                                            #watermark_ident
3352                                                -> map(|watermark| (None, Some(watermark)))
3353                                                -> [1]#chain_ident;
3354
3355                                            #fold_ident = #chain_ident
3356                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3357                                                    let __reduce_keyed_fn = #f;
3358                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3359                                                        if let Some((k, v)) = opt_payload {
3360                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3361                                                                if k <= curr_watermark {
3362                                                                    return;
3363                                                                }
3364                                                            }
3365                                                            match map.entry(k) {
3366                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3367                                                                    e.insert(v);
3368                                                                }
3369                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3370                                                                    __reduce_keyed_fn(e.get_mut(), v);
3371                                                                }
3372                                                            }
3373                                                        } else {
3374                                                            let watermark = opt_watermark.unwrap();
3375                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3376                                                                if watermark <= curr_watermark {
3377                                                                    return;
3378                                                                }
3379                                                            }
3380                                                            *opt_curr_watermark = opt_watermark;
3381                                                            map.retain(|k, _| *k > watermark);
3382                                                        }
3383                                                    }
3384                                                })
3385                                                -> flat_map(|(map, _curr_watermark)| map);
3386                                        },
3387                                        None,
3388                                        Some(&next_stmt_id.to_string()),
3389                                    );
3390                                }
3391                            }
3392                            BuildersOrCallback::Callback(_, node_callback) => {
3393                                node_callback(node, next_stmt_id);
3394                            }
3395                        }
3396
3397                        *next_stmt_id += 1;
3398
3399                        ident_stack.push(fold_ident);
3400                    }
3401
3402                    HydroNode::Network {
3403                        serialize_fn: serialize_pipeline,
3404                        instantiate_fn,
3405                        deserialize_fn: deserialize_pipeline,
3406                        input,
3407                        ..
3408                    } => {
3409                        let input_ident = ident_stack.pop().unwrap();
3410
3411                        let receiver_stream_ident =
3412                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3413
3414                        match builders_or_callback {
3415                            BuildersOrCallback::Builders(graph_builders) => {
3416                                let (sink_expr, source_expr) = match instantiate_fn {
3417                                    DebugInstantiate::Building => (
3418                                        syn::parse_quote!(DUMMY_SINK),
3419                                        syn::parse_quote!(DUMMY_SOURCE),
3420                                    ),
3421
3422                                    DebugInstantiate::Finalized(finalized) => {
3423                                        (finalized.sink.clone(), finalized.source.clone())
3424                                    }
3425                                };
3426
3427                                graph_builders.create_network(
3428                                    &input.metadata().location_id,
3429                                    &out_location,
3430                                    input_ident,
3431                                    &receiver_stream_ident,
3432                                    serialize_pipeline.as_ref(),
3433                                    sink_expr,
3434                                    source_expr,
3435                                    deserialize_pipeline.as_ref(),
3436                                    *next_stmt_id,
3437                                );
3438                            }
3439                            BuildersOrCallback::Callback(_, node_callback) => {
3440                                node_callback(node, next_stmt_id);
3441                            }
3442                        }
3443
3444                        *next_stmt_id += 1;
3445
3446                        ident_stack.push(receiver_stream_ident);
3447                    }
3448
3449                    HydroNode::ExternalInput {
3450                        instantiate_fn,
3451                        deserialize_fn: deserialize_pipeline,
3452                        ..
3453                    } => {
3454                        let receiver_stream_ident =
3455                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3456
3457                        match builders_or_callback {
3458                            BuildersOrCallback::Builders(graph_builders) => {
3459                                let (_, source_expr) = match instantiate_fn {
3460                                    DebugInstantiate::Building => (
3461                                        syn::parse_quote!(DUMMY_SINK),
3462                                        syn::parse_quote!(DUMMY_SOURCE),
3463                                    ),
3464
3465                                    DebugInstantiate::Finalized(finalized) => {
3466                                        (finalized.sink.clone(), finalized.source.clone())
3467                                    }
3468                                };
3469
3470                                graph_builders.create_external_source(
3471                                    &out_location,
3472                                    source_expr,
3473                                    &receiver_stream_ident,
3474                                    deserialize_pipeline.as_ref(),
3475                                    *next_stmt_id,
3476                                );
3477                            }
3478                            BuildersOrCallback::Callback(_, node_callback) => {
3479                                node_callback(node, next_stmt_id);
3480                            }
3481                        }
3482
3483                        *next_stmt_id += 1;
3484
3485                        ident_stack.push(receiver_stream_ident);
3486                    }
3487
3488                    HydroNode::Counter {
3489                        tag,
3490                        duration,
3491                        prefix,
3492                        ..
3493                    } => {
3494                        let input_ident = ident_stack.pop().unwrap();
3495
3496                        let counter_ident =
3497                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3498
3499                        match builders_or_callback {
3500                            BuildersOrCallback::Builders(graph_builders) => {
3501                                let arg = format!("{}({})", prefix, tag);
3502                                let builder = graph_builders.get_dfir_mut(&out_location);
3503                                builder.add_dfir(
3504                                    parse_quote! {
3505                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3506                                    },
3507                                    None,
3508                                    Some(&next_stmt_id.to_string()),
3509                                );
3510                            }
3511                            BuildersOrCallback::Callback(_, node_callback) => {
3512                                node_callback(node, next_stmt_id);
3513                            }
3514                        }
3515
3516                        *next_stmt_id += 1;
3517
3518                        ident_stack.push(counter_ident);
3519                    }
3520                }
3521            },
3522            seen_tees,
3523            false,
3524        );
3525
3526        ident_stack
3527            .pop()
3528            .expect("ident_stack should have exactly one element after traversal")
3529    }
3530
3531    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3532        match self {
3533            HydroNode::Placeholder => {
3534                panic!()
3535            }
3536            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3537            HydroNode::Source { source, .. } => match source {
3538                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3539                HydroSource::ExternalNetwork()
3540                | HydroSource::Spin()
3541                | HydroSource::ClusterMembers(_)
3542                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3543            },
3544            HydroNode::SingletonSource { value, .. } => {
3545                transform(value);
3546            }
3547            HydroNode::CycleSource { .. }
3548            | HydroNode::Tee { .. }
3549            | HydroNode::YieldConcat { .. }
3550            | HydroNode::BeginAtomic { .. }
3551            | HydroNode::EndAtomic { .. }
3552            | HydroNode::Batch { .. }
3553            | HydroNode::Chain { .. }
3554            | HydroNode::ChainFirst { .. }
3555            | HydroNode::CrossProduct { .. }
3556            | HydroNode::CrossSingleton { .. }
3557            | HydroNode::ResolveFutures { .. }
3558            | HydroNode::ResolveFuturesOrdered { .. }
3559            | HydroNode::Join { .. }
3560            | HydroNode::Difference { .. }
3561            | HydroNode::AntiJoin { .. }
3562            | HydroNode::DeferTick { .. }
3563            | HydroNode::Enumerate { .. }
3564            | HydroNode::Unique { .. }
3565            | HydroNode::Sort { .. } => {}
3566            HydroNode::Map { f, .. }
3567            | HydroNode::FlatMap { f, .. }
3568            | HydroNode::Filter { f, .. }
3569            | HydroNode::FilterMap { f, .. }
3570            | HydroNode::Inspect { f, .. }
3571            | HydroNode::Reduce { f, .. }
3572            | HydroNode::ReduceKeyed { f, .. }
3573            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3574                transform(f);
3575            }
3576            HydroNode::Fold { init, acc, .. }
3577            | HydroNode::Scan { init, acc, .. }
3578            | HydroNode::FoldKeyed { init, acc, .. } => {
3579                transform(init);
3580                transform(acc);
3581            }
3582            HydroNode::Network {
3583                serialize_fn,
3584                deserialize_fn,
3585                ..
3586            } => {
3587                if let Some(serialize_fn) = serialize_fn {
3588                    transform(serialize_fn);
3589                }
3590                if let Some(deserialize_fn) = deserialize_fn {
3591                    transform(deserialize_fn);
3592                }
3593            }
3594            HydroNode::ExternalInput { deserialize_fn, .. } => {
3595                if let Some(deserialize_fn) = deserialize_fn {
3596                    transform(deserialize_fn);
3597                }
3598            }
3599            HydroNode::Counter { duration, .. } => {
3600                transform(duration);
3601            }
3602        }
3603    }
3604
3605    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3606        &self.metadata().op
3607    }
3608
3609    pub fn metadata(&self) -> &HydroIrMetadata {
3610        match self {
3611            HydroNode::Placeholder => {
3612                panic!()
3613            }
3614            HydroNode::Cast { metadata, .. } => metadata,
3615            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3616            HydroNode::Source { metadata, .. } => metadata,
3617            HydroNode::SingletonSource { metadata, .. } => metadata,
3618            HydroNode::CycleSource { metadata, .. } => metadata,
3619            HydroNode::Tee { metadata, .. } => metadata,
3620            HydroNode::YieldConcat { metadata, .. } => metadata,
3621            HydroNode::BeginAtomic { metadata, .. } => metadata,
3622            HydroNode::EndAtomic { metadata, .. } => metadata,
3623            HydroNode::Batch { metadata, .. } => metadata,
3624            HydroNode::Chain { metadata, .. } => metadata,
3625            HydroNode::ChainFirst { metadata, .. } => metadata,
3626            HydroNode::CrossProduct { metadata, .. } => metadata,
3627            HydroNode::CrossSingleton { metadata, .. } => metadata,
3628            HydroNode::Join { metadata, .. } => metadata,
3629            HydroNode::Difference { metadata, .. } => metadata,
3630            HydroNode::AntiJoin { metadata, .. } => metadata,
3631            HydroNode::ResolveFutures { metadata, .. } => metadata,
3632            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3633            HydroNode::Map { metadata, .. } => metadata,
3634            HydroNode::FlatMap { metadata, .. } => metadata,
3635            HydroNode::Filter { metadata, .. } => metadata,
3636            HydroNode::FilterMap { metadata, .. } => metadata,
3637            HydroNode::DeferTick { metadata, .. } => metadata,
3638            HydroNode::Enumerate { metadata, .. } => metadata,
3639            HydroNode::Inspect { metadata, .. } => metadata,
3640            HydroNode::Unique { metadata, .. } => metadata,
3641            HydroNode::Sort { metadata, .. } => metadata,
3642            HydroNode::Scan { metadata, .. } => metadata,
3643            HydroNode::Fold { metadata, .. } => metadata,
3644            HydroNode::FoldKeyed { metadata, .. } => metadata,
3645            HydroNode::Reduce { metadata, .. } => metadata,
3646            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3647            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3648            HydroNode::ExternalInput { metadata, .. } => metadata,
3649            HydroNode::Network { metadata, .. } => metadata,
3650            HydroNode::Counter { metadata, .. } => metadata,
3651        }
3652    }
3653
3654    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3655        &mut self.metadata_mut().op
3656    }
3657
3658    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3659        match self {
3660            HydroNode::Placeholder => {
3661                panic!()
3662            }
3663            HydroNode::Cast { metadata, .. } => metadata,
3664            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3665            HydroNode::Source { metadata, .. } => metadata,
3666            HydroNode::SingletonSource { metadata, .. } => metadata,
3667            HydroNode::CycleSource { metadata, .. } => metadata,
3668            HydroNode::Tee { metadata, .. } => metadata,
3669            HydroNode::YieldConcat { metadata, .. } => metadata,
3670            HydroNode::BeginAtomic { metadata, .. } => metadata,
3671            HydroNode::EndAtomic { metadata, .. } => metadata,
3672            HydroNode::Batch { metadata, .. } => metadata,
3673            HydroNode::Chain { metadata, .. } => metadata,
3674            HydroNode::ChainFirst { metadata, .. } => metadata,
3675            HydroNode::CrossProduct { metadata, .. } => metadata,
3676            HydroNode::CrossSingleton { metadata, .. } => metadata,
3677            HydroNode::Join { metadata, .. } => metadata,
3678            HydroNode::Difference { metadata, .. } => metadata,
3679            HydroNode::AntiJoin { metadata, .. } => metadata,
3680            HydroNode::ResolveFutures { metadata, .. } => metadata,
3681            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3682            HydroNode::Map { metadata, .. } => metadata,
3683            HydroNode::FlatMap { metadata, .. } => metadata,
3684            HydroNode::Filter { metadata, .. } => metadata,
3685            HydroNode::FilterMap { metadata, .. } => metadata,
3686            HydroNode::DeferTick { metadata, .. } => metadata,
3687            HydroNode::Enumerate { metadata, .. } => metadata,
3688            HydroNode::Inspect { metadata, .. } => metadata,
3689            HydroNode::Unique { metadata, .. } => metadata,
3690            HydroNode::Sort { metadata, .. } => metadata,
3691            HydroNode::Scan { metadata, .. } => metadata,
3692            HydroNode::Fold { metadata, .. } => metadata,
3693            HydroNode::FoldKeyed { metadata, .. } => metadata,
3694            HydroNode::Reduce { metadata, .. } => metadata,
3695            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3696            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3697            HydroNode::ExternalInput { metadata, .. } => metadata,
3698            HydroNode::Network { metadata, .. } => metadata,
3699            HydroNode::Counter { metadata, .. } => metadata,
3700        }
3701    }
3702
3703    pub fn input(&self) -> Vec<&HydroNode> {
3704        match self {
3705            HydroNode::Placeholder => {
3706                panic!()
3707            }
3708            HydroNode::Source { .. }
3709            | HydroNode::SingletonSource { .. }
3710            | HydroNode::ExternalInput { .. }
3711            | HydroNode::CycleSource { .. }
3712            | HydroNode::Tee { .. } => {
3713                // Tee should find its input in separate special ways
3714                vec![]
3715            }
3716            HydroNode::Cast { inner, .. }
3717            | HydroNode::ObserveNonDet { inner, .. }
3718            | HydroNode::YieldConcat { inner, .. }
3719            | HydroNode::BeginAtomic { inner, .. }
3720            | HydroNode::EndAtomic { inner, .. }
3721            | HydroNode::Batch { inner, .. } => {
3722                vec![inner]
3723            }
3724            HydroNode::Chain { first, second, .. } => {
3725                vec![first, second]
3726            }
3727            HydroNode::ChainFirst { first, second, .. } => {
3728                vec![first, second]
3729            }
3730            HydroNode::CrossProduct { left, right, .. }
3731            | HydroNode::CrossSingleton { left, right, .. }
3732            | HydroNode::Join { left, right, .. } => {
3733                vec![left, right]
3734            }
3735            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3736                vec![pos, neg]
3737            }
3738            HydroNode::Map { input, .. }
3739            | HydroNode::FlatMap { input, .. }
3740            | HydroNode::Filter { input, .. }
3741            | HydroNode::FilterMap { input, .. }
3742            | HydroNode::Sort { input, .. }
3743            | HydroNode::DeferTick { input, .. }
3744            | HydroNode::Enumerate { input, .. }
3745            | HydroNode::Inspect { input, .. }
3746            | HydroNode::Unique { input, .. }
3747            | HydroNode::Network { input, .. }
3748            | HydroNode::Counter { input, .. }
3749            | HydroNode::ResolveFutures { input, .. }
3750            | HydroNode::ResolveFuturesOrdered { input, .. }
3751            | HydroNode::Fold { input, .. }
3752            | HydroNode::FoldKeyed { input, .. }
3753            | HydroNode::Reduce { input, .. }
3754            | HydroNode::ReduceKeyed { input, .. }
3755            | HydroNode::Scan { input, .. } => {
3756                vec![input]
3757            }
3758            HydroNode::ReduceKeyedWatermark {
3759                input, watermark, ..
3760            } => {
3761                vec![input, watermark]
3762            }
3763        }
3764    }
3765
3766    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3767        self.input()
3768            .iter()
3769            .map(|input_node| input_node.metadata())
3770            .collect()
3771    }
3772
3773    pub fn print_root(&self) -> String {
3774        match self {
3775            HydroNode::Placeholder => {
3776                panic!()
3777            }
3778            HydroNode::Cast { .. } => "Cast()".to_owned(),
3779            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3780            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3781            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3782            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3783            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3784            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3785            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3786            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3787            HydroNode::Batch { .. } => "Batch()".to_owned(),
3788            HydroNode::Chain { first, second, .. } => {
3789                format!("Chain({}, {})", first.print_root(), second.print_root())
3790            }
3791            HydroNode::ChainFirst { first, second, .. } => {
3792                format!(
3793                    "ChainFirst({}, {})",
3794                    first.print_root(),
3795                    second.print_root()
3796                )
3797            }
3798            HydroNode::CrossProduct { left, right, .. } => {
3799                format!(
3800                    "CrossProduct({}, {})",
3801                    left.print_root(),
3802                    right.print_root()
3803                )
3804            }
3805            HydroNode::CrossSingleton { left, right, .. } => {
3806                format!(
3807                    "CrossSingleton({}, {})",
3808                    left.print_root(),
3809                    right.print_root()
3810                )
3811            }
3812            HydroNode::Join { left, right, .. } => {
3813                format!("Join({}, {})", left.print_root(), right.print_root())
3814            }
3815            HydroNode::Difference { pos, neg, .. } => {
3816                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3817            }
3818            HydroNode::AntiJoin { pos, neg, .. } => {
3819                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3820            }
3821            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3822            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3823            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3824            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3825            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3826            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3827            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3828            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3829            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3830            HydroNode::Unique { .. } => "Unique()".to_owned(),
3831            HydroNode::Sort { .. } => "Sort()".to_owned(),
3832            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3833            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3834            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3835            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3836            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3837            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3838            HydroNode::Network { .. } => "Network()".to_owned(),
3839            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3840            HydroNode::Counter { tag, duration, .. } => {
3841                format!("Counter({:?}, {:?})", tag, duration)
3842            }
3843        }
3844    }
3845}
3846
3847#[cfg(feature = "build")]
3848fn instantiate_network<'a, D>(
3849    from_location: &LocationId,
3850    to_location: &LocationId,
3851    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3852    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3853) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3854where
3855    D: Deploy<'a>,
3856{
3857    let ((sink, source), connect_fn) = match (from_location, to_location) {
3858        (&LocationId::Process(from), &LocationId::Process(to)) => {
3859            let from_node = processes
3860                .get(from)
3861                .unwrap_or_else(|| {
3862                    panic!("A process used in the graph was not instantiated: {}", from)
3863                })
3864                .clone();
3865            let to_node = processes
3866                .get(to)
3867                .unwrap_or_else(|| {
3868                    panic!("A process used in the graph was not instantiated: {}", to)
3869                })
3870                .clone();
3871
3872            let sink_port = from_node.next_port();
3873            let source_port = to_node.next_port();
3874
3875            (
3876                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3877                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3878            )
3879        }
3880        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3881            let from_node = processes
3882                .get(from)
3883                .unwrap_or_else(|| {
3884                    panic!("A process used in the graph was not instantiated: {}", from)
3885                })
3886                .clone();
3887            let to_node = clusters
3888                .get(to)
3889                .unwrap_or_else(|| {
3890                    panic!("A cluster used in the graph was not instantiated: {}", to)
3891                })
3892                .clone();
3893
3894            let sink_port = from_node.next_port();
3895            let source_port = to_node.next_port();
3896
3897            (
3898                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3899                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3900            )
3901        }
3902        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3903            let from_node = clusters
3904                .get(from)
3905                .unwrap_or_else(|| {
3906                    panic!("A cluster used in the graph was not instantiated: {}", from)
3907                })
3908                .clone();
3909            let to_node = processes
3910                .get(to)
3911                .unwrap_or_else(|| {
3912                    panic!("A process used in the graph was not instantiated: {}", to)
3913                })
3914                .clone();
3915
3916            let sink_port = from_node.next_port();
3917            let source_port = to_node.next_port();
3918
3919            (
3920                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3921                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3922            )
3923        }
3924        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3925            let from_node = clusters
3926                .get(from)
3927                .unwrap_or_else(|| {
3928                    panic!("A cluster used in the graph was not instantiated: {}", from)
3929                })
3930                .clone();
3931            let to_node = clusters
3932                .get(to)
3933                .unwrap_or_else(|| {
3934                    panic!("A cluster used in the graph was not instantiated: {}", to)
3935                })
3936                .clone();
3937
3938            let sink_port = from_node.next_port();
3939            let source_port = to_node.next_port();
3940
3941            (
3942                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3943                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3944            )
3945        }
3946        (LocationId::Tick(_, _), _) => panic!(),
3947        (_, LocationId::Tick(_, _)) => panic!(),
3948        (LocationId::Atomic(_), _) => panic!(),
3949        (_, LocationId::Atomic(_)) => panic!(),
3950    };
3951    (sink, source, connect_fn)
3952}
3953
3954#[cfg(test)]
3955mod test {
3956    use std::mem::size_of;
3957
3958    use stageleft::{QuotedWithContext, q};
3959
3960    use super::*;
3961
3962    #[test]
3963    #[cfg_attr(
3964        not(feature = "build"),
3965        ignore = "expects inclusion of feature-gated fields"
3966    )]
3967    fn hydro_node_size() {
3968        assert_eq!(size_of::<HydroNode>(), 240);
3969    }
3970
3971    #[test]
3972    #[cfg_attr(
3973        not(feature = "build"),
3974        ignore = "expects inclusion of feature-gated fields"
3975    )]
3976    fn hydro_root_size() {
3977        assert_eq!(size_of::<HydroRoot>(), 136);
3978    }
3979
3980    #[test]
3981    fn test_simplify_q_macro_basic() {
3982        // Test basic non-q! expression
3983        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3984        let result = simplify_q_macro(simple_expr.clone());
3985        assert_eq!(result, simple_expr);
3986    }
3987
3988    #[test]
3989    fn test_simplify_q_macro_actual_stageleft_call() {
3990        // Test a simplified version of what a real stageleft call might look like
3991        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3992        let result = simplify_q_macro(stageleft_call);
3993        // This should be processed by our visitor and simplified to q!(...)
3994        // since we detect the stageleft::runtime_support::fn_* pattern
3995        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3996    }
3997
3998    #[test]
3999    fn test_closure_no_pipe_at_start() {
4000        // Test a closure that does not start with a pipe
4001        let stageleft_call = q!({
4002            let foo = 123;
4003            move |b: usize| b + foo
4004        })
4005        .splice_fn1_ctx(&());
4006        let result = simplify_q_macro(stageleft_call);
4007        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4008    }
4009}