Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// A closure expression bundled with any singleton references it captures.
38///
39/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
40/// alongside the closure's expression. This allows per-closure tracking of singleton
41/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
42pub struct ClosureExpr {
43    pub expr: DebugExpr,
44    pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
45}
46
47impl Clone for ClosureExpr {
48    fn clone(&self) -> Self {
49        Self {
50            expr: self.expr.clone(),
51            singleton_refs: self
52                .singleton_refs
53                .iter()
54                .map(|(ident, node)| {
55                    let cloned_node = match node {
56                        HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
57                            inner: SharedNode(inner.0.clone()),
58                            metadata: metadata.clone(),
59                        },
60                        _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
61                    };
62                    (ident.clone(), cloned_node)
63                })
64                .collect(),
65        }
66    }
67}
68
69impl Hash for ClosureExpr {
70    fn hash<H: Hasher>(&self, state: &mut H) {
71        self.expr.hash(state);
72        // singleton_refs are structural children (like HydroIrMetadata), not
73        // identity-defining. Two closures with the same expr but different
74        // captured refs are the same closure text — the refs only affect codegen.
75    }
76}
77
78impl serde::Serialize for ClosureExpr {
79    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
80        use serde::ser::SerializeStruct;
81        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
82        s.serialize_field("expr", &self.expr)?;
83        s.serialize_field(
84            "singleton_refs",
85            &SerializableSingletonRefs(&self.singleton_refs),
86        )?;
87        s.end()
88    }
89}
90
91struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
92
93impl serde::Serialize for SerializableSingletonRefs<'_> {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeSeq;
96        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
97        for (ident, node) in self.0 {
98            seq.serialize_element(&(ident.to_string(), node))?;
99        }
100        seq.end()
101    }
102}
103
104impl Debug for ClosureExpr {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        Debug::fmt(&self.expr, f)
107    }
108}
109
110impl Display for ClosureExpr {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        Display::fmt(&self.expr, f)
113    }
114}
115
116impl From<syn::Expr> for ClosureExpr {
117    fn from(expr: syn::Expr) -> Self {
118        Self {
119            expr: DebugExpr(Box::new(expr)),
120            singleton_refs: Vec::new(),
121        }
122    }
123}
124
125impl From<DebugExpr> for ClosureExpr {
126    fn from(expr: DebugExpr) -> Self {
127        Self {
128            expr,
129            singleton_refs: Vec::new(),
130        }
131    }
132}
133
134impl ClosureExpr {
135    pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
136        Self {
137            expr,
138            singleton_refs,
139        }
140    }
141
142    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
143        Self {
144            expr: self.expr.clone(),
145            singleton_refs: self
146                .singleton_refs
147                .iter()
148                .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
149                .collect(),
150        }
151    }
152
153    pub fn transform_children(
154        &mut self,
155        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
156        seen_tees: &mut SeenSharedNodes,
157    ) {
158        for (_ident, ref_node) in self.singleton_refs.iter_mut() {
159            transform(ref_node, seen_tees);
160        }
161    }
162
163    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
164    /// replacing local singleton ref idents with `#dfir_ident` references.
165    #[cfg(feature = "build")]
166    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
167        if self.singleton_refs.is_empty() {
168            self.expr.0.to_token_stream()
169        } else {
170            let ref_idents = (0..self.singleton_refs.len())
171                .map(|_| ident_stack.pop().unwrap())
172                .collect::<Vec<_>>()
173                .into_iter()
174                .rev()
175                .collect::<Vec<_>>();
176            let local_idents = self
177                .singleton_refs
178                .iter()
179                .map(|(local_ident, _)| local_ident);
180            let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
181            let expr = &self.expr.0;
182            quote! {
183                {
184                    #(
185                        let #local_idents = #hash #ref_idents;
186                    )*
187                    #expr
188                }
189            }
190        }
191    }
192}
193
194/// Wrapper that displays only the tokens of a parsed expr.
195///
196/// Boxes `syn::Type` which is ~240 bytes.
197#[derive(Clone, Hash)]
198pub struct DebugExpr(pub Box<syn::Expr>);
199
200impl serde::Serialize for DebugExpr {
201    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
202        serializer.serialize_str(&self.to_string())
203    }
204}
205
206impl From<syn::Expr> for DebugExpr {
207    fn from(expr: syn::Expr) -> Self {
208        Self(Box::new(expr))
209    }
210}
211
212impl Deref for DebugExpr {
213    type Target = syn::Expr;
214
215    fn deref(&self) -> &Self::Target {
216        &self.0
217    }
218}
219
220impl ToTokens for DebugExpr {
221    fn to_tokens(&self, tokens: &mut TokenStream) {
222        self.0.to_tokens(tokens);
223    }
224}
225
226impl Debug for DebugExpr {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        write!(f, "{}", self.0.to_token_stream())
229    }
230}
231
232impl Display for DebugExpr {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        let original = self.0.as_ref().clone();
235        let simplified = simplify_q_macro(original);
236
237        // For now, just use quote formatting without trying to parse as a statement
238        // This avoids the syn::parse_quote! issues entirely
239        write!(f, "q!({})", quote::quote!(#simplified))
240    }
241}
242
243/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
244fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
245    // Try to parse the token string as a syn::Expr
246    // Use a visitor to simplify q! macro expansions
247    let mut simplifier = QMacroSimplifier::new();
248    simplifier.visit_expr_mut(&mut expr);
249
250    // If we found and simplified a q! macro, return the simplified version
251    if let Some(simplified) = simplifier.simplified_result {
252        simplified
253    } else {
254        expr
255    }
256}
257
258/// AST visitor that simplifies q! macro expansions
259#[derive(Default)]
260pub struct QMacroSimplifier {
261    pub simplified_result: Option<syn::Expr>,
262}
263
264impl QMacroSimplifier {
265    pub fn new() -> Self {
266        Self::default()
267    }
268}
269
270impl VisitMut for QMacroSimplifier {
271    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
272        // Check if we already found a result to avoid further processing
273        if self.simplified_result.is_some() {
274            return;
275        }
276
277        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
278            // Look for calls to stageleft::runtime_support::fn*
279            && self.is_stageleft_runtime_support_call(&path_expr.path)
280            // Try to extract the closure from the arguments
281            && let Some(closure) = self.extract_closure_from_args(&call.args)
282        {
283            self.simplified_result = Some(closure);
284            return;
285        }
286
287        // Continue visiting child expressions using the default implementation
288        // Use the default visitor to avoid infinite recursion
289        syn::visit_mut::visit_expr_mut(self, expr);
290    }
291}
292
293impl QMacroSimplifier {
294    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
295        // Check if this is a call to stageleft::runtime_support::fn*
296        if let Some(last_segment) = path.segments.last() {
297            let fn_name = last_segment.ident.to_string();
298            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
299            fn_name.contains("_type_hint")
300                && path.segments.len() > 2
301                && path.segments[0].ident == "stageleft"
302                && path.segments[1].ident == "runtime_support"
303        } else {
304            false
305        }
306    }
307
308    fn extract_closure_from_args(
309        &self,
310        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
311    ) -> Option<syn::Expr> {
312        // Look through the arguments for a closure expression
313        for arg in args {
314            if let syn::Expr::Closure(_) = arg {
315                return Some(arg.clone());
316            }
317            // Also check for closures nested in other expressions (like blocks)
318            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
319                return Some(closure_expr);
320            }
321        }
322        None
323    }
324
325    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
326        let mut visitor = ClosureFinder {
327            found_closure: None,
328            prefer_inner_blocks: true,
329        };
330        visitor.visit_expr(expr);
331        visitor.found_closure
332    }
333}
334
335/// Visitor that finds closures in expressions with special block handling
336struct ClosureFinder {
337    found_closure: Option<syn::Expr>,
338    prefer_inner_blocks: bool,
339}
340
341impl<'ast> Visit<'ast> for ClosureFinder {
342    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
343        // If we already found a closure, don't continue searching
344        if self.found_closure.is_some() {
345            return;
346        }
347
348        match expr {
349            syn::Expr::Closure(_) => {
350                self.found_closure = Some(expr.clone());
351            }
352            syn::Expr::Block(block) if self.prefer_inner_blocks => {
353                // Special handling for blocks - look for inner blocks that contain closures
354                for stmt in &block.block.stmts {
355                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
356                        && let syn::Expr::Block(_) = stmt_expr
357                    {
358                        // Check if this nested block contains a closure
359                        let mut inner_visitor = ClosureFinder {
360                            found_closure: None,
361                            prefer_inner_blocks: false, // Avoid infinite recursion
362                        };
363                        inner_visitor.visit_expr(stmt_expr);
364                        if inner_visitor.found_closure.is_some() {
365                            // Found a closure in an inner block, return that block
366                            self.found_closure = Some(stmt_expr.clone());
367                            return;
368                        }
369                    }
370                }
371
372                // If no inner block with closure found, continue with normal visitation
373                visit::visit_expr(self, expr);
374
375                // If we found a closure, just return the closure itself, not the whole block
376                // unless we're in the special case where we want the containing block
377                if self.found_closure.is_some() {
378                    // The closure was found during visitation, no need to wrap in block
379                }
380            }
381            _ => {
382                // Use default visitor behavior for all other expressions
383                visit::visit_expr(self, expr);
384            }
385        }
386    }
387}
388
389/// Debug displays the type's tokens.
390///
391/// Boxes `syn::Type` which is ~320 bytes.
392#[derive(Clone, PartialEq, Eq, Hash)]
393pub struct DebugType(pub Box<syn::Type>);
394
395impl From<syn::Type> for DebugType {
396    fn from(t: syn::Type) -> Self {
397        Self(Box::new(t))
398    }
399}
400
401impl Deref for DebugType {
402    type Target = syn::Type;
403
404    fn deref(&self) -> &Self::Target {
405        &self.0
406    }
407}
408
409impl ToTokens for DebugType {
410    fn to_tokens(&self, tokens: &mut TokenStream) {
411        self.0.to_tokens(tokens);
412    }
413}
414
415impl Debug for DebugType {
416    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417        write!(f, "{}", self.0.to_token_stream())
418    }
419}
420
421impl serde::Serialize for DebugType {
422    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
423        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
424    }
425}
426
427fn serialize_backtrace_as_span<S: serde::Serializer>(
428    backtrace: &Backtrace,
429    serializer: S,
430) -> Result<S::Ok, S::Error> {
431    match backtrace.format_span() {
432        Some(span) => serializer.serialize_some(&span),
433        None => serializer.serialize_none(),
434    }
435}
436
437fn serialize_ident<S: serde::Serializer>(
438    ident: &syn::Ident,
439    serializer: S,
440) -> Result<S::Ok, S::Error> {
441    serializer.serialize_str(&ident.to_string())
442}
443
444pub enum DebugInstantiate {
445    Building,
446    Finalized(Box<DebugInstantiateFinalized>),
447}
448
449impl serde::Serialize for DebugInstantiate {
450    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
451        match self {
452            DebugInstantiate::Building => {
453                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
454            }
455            DebugInstantiate::Finalized(_) => {
456                panic!(
457                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
458                )
459            }
460        }
461    }
462}
463
464#[cfg_attr(
465    not(feature = "build"),
466    expect(
467        dead_code,
468        reason = "sink, source unused without `feature = \"build\"`."
469    )
470)]
471pub struct DebugInstantiateFinalized {
472    sink: syn::Expr,
473    source: syn::Expr,
474    connect_fn: Option<Box<dyn FnOnce()>>,
475}
476
477impl From<DebugInstantiateFinalized> for DebugInstantiate {
478    fn from(f: DebugInstantiateFinalized) -> Self {
479        Self::Finalized(Box::new(f))
480    }
481}
482
483impl Debug for DebugInstantiate {
484    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485        write!(f, "<network instantiate>")
486    }
487}
488
489impl Hash for DebugInstantiate {
490    fn hash<H: Hasher>(&self, _state: &mut H) {
491        // Do nothing
492    }
493}
494
495impl Clone for DebugInstantiate {
496    fn clone(&self) -> Self {
497        match self {
498            DebugInstantiate::Building => DebugInstantiate::Building,
499            DebugInstantiate::Finalized(_) => {
500                panic!("DebugInstantiate::Finalized should not be cloned")
501            }
502        }
503    }
504}
505
506/// Tracks the instantiation state of a `ClusterMembers` source.
507///
508/// During `compile_network`, the first `ClusterMembers` node for a given
509/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
510/// receives the expression returned by `Deploy::cluster_membership_stream`.
511/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
512/// during code-gen they simply reference the tee output of the first node
513/// instead of creating a redundant `source_stream`.
514#[derive(Debug, Hash, Clone, serde::Serialize)]
515pub enum ClusterMembersState {
516    /// Not yet instantiated.
517    Uninit,
518    /// The primary instance: holds the stream expression and will emit
519    /// `source_stream(expr) -> tee()` during code-gen.
520    Stream(DebugExpr),
521    /// A secondary instance that references the tee output of the primary.
522    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
523    /// can derive the deterministic tee ident without extra state.
524    Tee(LocationId, LocationId),
525}
526
527/// A source in a Hydro graph, where data enters the graph.
528#[derive(Debug, Hash, Clone, serde::Serialize)]
529pub enum HydroSource {
530    Stream(DebugExpr),
531    ExternalNetwork(),
532    Iter(DebugExpr),
533    Spin(),
534    ClusterMembers(LocationId, ClusterMembersState),
535    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
536    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
537}
538
539#[cfg(feature = "build")]
540/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
541/// and simulations.
542///
543/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
544/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
545pub trait DfirBuilder {
546    /// Whether the representation of singletons should include intermediate states.
547    fn singleton_intermediates(&self) -> bool;
548
549    /// Gets the DFIR builder for the given location, creating it if necessary.
550    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
551
552    #[expect(clippy::too_many_arguments, reason = "TODO")]
553    fn batch(
554        &mut self,
555        in_ident: syn::Ident,
556        in_location: &LocationId,
557        in_kind: &CollectionKind,
558        out_ident: &syn::Ident,
559        out_location: &LocationId,
560        op_meta: &HydroIrOpMetadata,
561        fold_hooked_idents: &HashSet<String>,
562    );
563    fn yield_from_tick(
564        &mut self,
565        in_ident: syn::Ident,
566        in_location: &LocationId,
567        in_kind: &CollectionKind,
568        out_ident: &syn::Ident,
569        out_location: &LocationId,
570    );
571
572    fn begin_atomic(
573        &mut self,
574        in_ident: syn::Ident,
575        in_location: &LocationId,
576        in_kind: &CollectionKind,
577        out_ident: &syn::Ident,
578        out_location: &LocationId,
579        op_meta: &HydroIrOpMetadata,
580    );
581    fn end_atomic(
582        &mut self,
583        in_ident: syn::Ident,
584        in_location: &LocationId,
585        in_kind: &CollectionKind,
586        out_ident: &syn::Ident,
587    );
588
589    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
590    fn observe_nondet(
591        &mut self,
592        trusted: bool,
593        location: &LocationId,
594        in_ident: syn::Ident,
595        in_kind: &CollectionKind,
596        out_ident: &syn::Ident,
597        out_kind: &CollectionKind,
598        op_meta: &HydroIrOpMetadata,
599    );
600
601    #[expect(clippy::too_many_arguments, reason = "TODO")]
602    fn merge_ordered(
603        &mut self,
604        location: &LocationId,
605        first_ident: syn::Ident,
606        second_ident: syn::Ident,
607        out_ident: &syn::Ident,
608        in_kind: &CollectionKind,
609        op_meta: &HydroIrOpMetadata,
610        operator_tag: Option<&str>,
611    );
612
613    #[expect(clippy::too_many_arguments, reason = "TODO")]
614    fn create_network(
615        &mut self,
616        from: &LocationId,
617        to: &LocationId,
618        input_ident: syn::Ident,
619        out_ident: &syn::Ident,
620        serialize: Option<&DebugExpr>,
621        sink: syn::Expr,
622        source: syn::Expr,
623        deserialize: Option<&DebugExpr>,
624        tag_id: usize,
625        networking_info: &crate::networking::NetworkingInfo,
626    );
627
628    fn create_external_source(
629        &mut self,
630        on: &LocationId,
631        source_expr: syn::Expr,
632        out_ident: &syn::Ident,
633        deserialize: Option<&DebugExpr>,
634        tag_id: usize,
635    );
636
637    fn create_external_output(
638        &mut self,
639        on: &LocationId,
640        sink_expr: syn::Expr,
641        input_ident: &syn::Ident,
642        serialize: Option<&DebugExpr>,
643        tag_id: usize,
644    );
645
646    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
647    /// Returns the new input ident to use for the fold if a hook was emitted.
648    fn emit_fold_hook(
649        &mut self,
650        location: &LocationId,
651        in_ident: &syn::Ident,
652        in_kind: &CollectionKind,
653        op_meta: &HydroIrOpMetadata,
654    ) -> Option<syn::Ident>;
655}
656
657#[cfg(feature = "build")]
658impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
659    fn singleton_intermediates(&self) -> bool {
660        false
661    }
662
663    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
664        self.entry(location.root().key())
665            .expect("location was removed")
666            .or_default()
667    }
668
669    fn batch(
670        &mut self,
671        in_ident: syn::Ident,
672        in_location: &LocationId,
673        in_kind: &CollectionKind,
674        out_ident: &syn::Ident,
675        _out_location: &LocationId,
676        _op_meta: &HydroIrOpMetadata,
677        _fold_hooked_idents: &HashSet<String>,
678    ) {
679        let builder = self.get_dfir_mut(in_location.root());
680        if in_kind.is_bounded()
681            && matches!(
682                in_kind,
683                CollectionKind::Singleton { .. }
684                    | CollectionKind::Optional { .. }
685                    | CollectionKind::KeyedSingleton { .. }
686            )
687        {
688            assert!(in_location.is_top_level());
689            builder.add_dfir(
690                parse_quote! {
691                    #out_ident = #in_ident -> persist::<'static>();
692                },
693                None,
694                None,
695            );
696        } else {
697            builder.add_dfir(
698                parse_quote! {
699                    #out_ident = #in_ident;
700                },
701                None,
702                None,
703            );
704        }
705    }
706
707    fn yield_from_tick(
708        &mut self,
709        in_ident: syn::Ident,
710        in_location: &LocationId,
711        _in_kind: &CollectionKind,
712        out_ident: &syn::Ident,
713        _out_location: &LocationId,
714    ) {
715        let builder = self.get_dfir_mut(in_location.root());
716        builder.add_dfir(
717            parse_quote! {
718                #out_ident = #in_ident;
719            },
720            None,
721            None,
722        );
723    }
724
725    fn begin_atomic(
726        &mut self,
727        in_ident: syn::Ident,
728        in_location: &LocationId,
729        _in_kind: &CollectionKind,
730        out_ident: &syn::Ident,
731        _out_location: &LocationId,
732        _op_meta: &HydroIrOpMetadata,
733    ) {
734        let builder = self.get_dfir_mut(in_location.root());
735        builder.add_dfir(
736            parse_quote! {
737                #out_ident = #in_ident;
738            },
739            None,
740            None,
741        );
742    }
743
744    fn end_atomic(
745        &mut self,
746        in_ident: syn::Ident,
747        in_location: &LocationId,
748        _in_kind: &CollectionKind,
749        out_ident: &syn::Ident,
750    ) {
751        let builder = self.get_dfir_mut(in_location.root());
752        builder.add_dfir(
753            parse_quote! {
754                #out_ident = #in_ident;
755            },
756            None,
757            None,
758        );
759    }
760
761    fn observe_nondet(
762        &mut self,
763        _trusted: bool,
764        location: &LocationId,
765        in_ident: syn::Ident,
766        _in_kind: &CollectionKind,
767        out_ident: &syn::Ident,
768        _out_kind: &CollectionKind,
769        _op_meta: &HydroIrOpMetadata,
770    ) {
771        let builder = self.get_dfir_mut(location);
772        builder.add_dfir(
773            parse_quote! {
774                #out_ident = #in_ident;
775            },
776            None,
777            None,
778        );
779    }
780
781    fn merge_ordered(
782        &mut self,
783        location: &LocationId,
784        first_ident: syn::Ident,
785        second_ident: syn::Ident,
786        out_ident: &syn::Ident,
787        _in_kind: &CollectionKind,
788        _op_meta: &HydroIrOpMetadata,
789        operator_tag: Option<&str>,
790    ) {
791        let builder = self.get_dfir_mut(location);
792        builder.add_dfir(
793            parse_quote! {
794                #out_ident = union();
795                #first_ident -> [0]#out_ident;
796                #second_ident -> [1]#out_ident;
797            },
798            None,
799            operator_tag,
800        );
801    }
802
803    fn create_network(
804        &mut self,
805        from: &LocationId,
806        to: &LocationId,
807        input_ident: syn::Ident,
808        out_ident: &syn::Ident,
809        serialize: Option<&DebugExpr>,
810        sink: syn::Expr,
811        source: syn::Expr,
812        deserialize: Option<&DebugExpr>,
813        tag_id: usize,
814        _networking_info: &crate::networking::NetworkingInfo,
815    ) {
816        let sender_builder = self.get_dfir_mut(from);
817        if let Some(serialize_pipeline) = serialize {
818            sender_builder.add_dfir(
819                parse_quote! {
820                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
821                },
822                None,
823                // operator tag separates send and receive, which otherwise have the same next_stmt_id
824                Some(&format!("send{}", tag_id)),
825            );
826        } else {
827            sender_builder.add_dfir(
828                parse_quote! {
829                    #input_ident -> dest_sink(#sink);
830                },
831                None,
832                Some(&format!("send{}", tag_id)),
833            );
834        }
835
836        let receiver_builder = self.get_dfir_mut(to);
837        if let Some(deserialize_pipeline) = deserialize {
838            receiver_builder.add_dfir(
839                parse_quote! {
840                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
841                },
842                None,
843                Some(&format!("recv{}", tag_id)),
844            );
845        } else {
846            receiver_builder.add_dfir(
847                parse_quote! {
848                    #out_ident = source_stream(#source);
849                },
850                None,
851                Some(&format!("recv{}", tag_id)),
852            );
853        }
854    }
855
856    fn create_external_source(
857        &mut self,
858        on: &LocationId,
859        source_expr: syn::Expr,
860        out_ident: &syn::Ident,
861        deserialize: Option<&DebugExpr>,
862        tag_id: usize,
863    ) {
864        let receiver_builder = self.get_dfir_mut(on);
865        if let Some(deserialize_pipeline) = deserialize {
866            receiver_builder.add_dfir(
867                parse_quote! {
868                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
869                },
870                None,
871                Some(&format!("recv{}", tag_id)),
872            );
873        } else {
874            receiver_builder.add_dfir(
875                parse_quote! {
876                    #out_ident = source_stream(#source_expr);
877                },
878                None,
879                Some(&format!("recv{}", tag_id)),
880            );
881        }
882    }
883
884    fn create_external_output(
885        &mut self,
886        on: &LocationId,
887        sink_expr: syn::Expr,
888        input_ident: &syn::Ident,
889        serialize: Option<&DebugExpr>,
890        tag_id: usize,
891    ) {
892        let sender_builder = self.get_dfir_mut(on);
893        if let Some(serialize_fn) = serialize {
894            sender_builder.add_dfir(
895                parse_quote! {
896                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
897                },
898                None,
899                // operator tag separates send and receive, which otherwise have the same next_stmt_id
900                Some(&format!("send{}", tag_id)),
901            );
902        } else {
903            sender_builder.add_dfir(
904                parse_quote! {
905                    #input_ident -> dest_sink(#sink_expr);
906                },
907                None,
908                Some(&format!("send{}", tag_id)),
909            );
910        }
911    }
912
913    fn emit_fold_hook(
914        &mut self,
915        _location: &LocationId,
916        _in_ident: &syn::Ident,
917        _in_kind: &CollectionKind,
918        _op_meta: &HydroIrOpMetadata,
919    ) -> Option<syn::Ident> {
920        None
921    }
922}
923
924#[cfg(feature = "build")]
925pub enum BuildersOrCallback<'a, L, N>
926where
927    L: FnMut(&mut HydroRoot, &mut usize),
928    N: FnMut(&mut HydroNode, &mut usize),
929{
930    Builders(&'a mut dyn DfirBuilder),
931    Callback(L, N),
932}
933
934/// An root in a Hydro graph, which is an pipeline that doesn't emit
935/// any downstream values. Traversals over the dataflow graph and
936/// generating DFIR IR start from roots.
937#[derive(Debug, Hash, serde::Serialize)]
938pub enum HydroRoot {
939    ForEach {
940        f: DebugExpr,
941        input: Box<HydroNode>,
942        op_metadata: HydroIrOpMetadata,
943    },
944    SendExternal {
945        to_external_key: LocationKey,
946        to_port_id: ExternalPortId,
947        to_many: bool,
948        unpaired: bool,
949        serialize_fn: Option<DebugExpr>,
950        instantiate_fn: DebugInstantiate,
951        input: Box<HydroNode>,
952        op_metadata: HydroIrOpMetadata,
953    },
954    DestSink {
955        sink: DebugExpr,
956        input: Box<HydroNode>,
957        op_metadata: HydroIrOpMetadata,
958    },
959    CycleSink {
960        cycle_id: CycleId,
961        input: Box<HydroNode>,
962        op_metadata: HydroIrOpMetadata,
963    },
964    EmbeddedOutput {
965        #[serde(serialize_with = "serialize_ident")]
966        ident: syn::Ident,
967        input: Box<HydroNode>,
968        op_metadata: HydroIrOpMetadata,
969    },
970    Null {
971        input: Box<HydroNode>,
972        op_metadata: HydroIrOpMetadata,
973    },
974}
975
976impl HydroRoot {
977    #[cfg(feature = "build")]
978    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
979    pub fn compile_network<'a, D>(
980        &mut self,
981        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
982        seen_tees: &mut SeenSharedNodes,
983        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
984        processes: &SparseSecondaryMap<LocationKey, D::Process>,
985        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
986        externals: &SparseSecondaryMap<LocationKey, D::External>,
987        env: &mut D::InstantiateEnv,
988    ) where
989        D: Deploy<'a>,
990    {
991        let refcell_extra_stmts = RefCell::new(extra_stmts);
992        let refcell_env = RefCell::new(env);
993        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
994        self.transform_bottom_up(
995            &mut |l| {
996                if let HydroRoot::SendExternal {
997                    input,
998                    to_external_key,
999                    to_port_id,
1000                    to_many,
1001                    unpaired,
1002                    instantiate_fn,
1003                    ..
1004                } = l
1005                {
1006                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1007                        DebugInstantiate::Building => {
1008                            let to_node = externals
1009                                .get(*to_external_key)
1010                                .unwrap_or_else(|| {
1011                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1012                                })
1013                                .clone();
1014
1015                            match input.metadata().location_id.root() {
1016                                &LocationId::Process(process_key) => {
1017                                    if *to_many {
1018                                        (
1019                                            (
1020                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1021                                                parse_quote!(DUMMY),
1022                                            ),
1023                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1024                                        )
1025                                    } else {
1026                                        let from_node = processes
1027                                            .get(process_key)
1028                                            .unwrap_or_else(|| {
1029                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1030                                            })
1031                                            .clone();
1032
1033                                        let sink_port = from_node.next_port();
1034                                        let source_port = to_node.next_port();
1035
1036                                        if *unpaired {
1037                                            use stageleft::quote_type;
1038                                            use tokio_util::codec::LengthDelimitedCodec;
1039
1040                                            to_node.register(*to_port_id, source_port.clone());
1041
1042                                            let _ = D::e2o_source(
1043                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1044                                                &to_node, &source_port,
1045                                                &from_node, &sink_port,
1046                                                &quote_type::<LengthDelimitedCodec>(),
1047                                                format!("{}_{}", *to_external_key, *to_port_id)
1048                                            );
1049                                        }
1050
1051                                        (
1052                                            (
1053                                                D::o2e_sink(
1054                                                    &from_node,
1055                                                    &sink_port,
1056                                                    &to_node,
1057                                                    &source_port,
1058                                                    format!("{}_{}", *to_external_key, *to_port_id)
1059                                                ),
1060                                                parse_quote!(DUMMY),
1061                                            ),
1062                                            if *unpaired {
1063                                                D::e2o_connect(
1064                                                    &to_node,
1065                                                    &source_port,
1066                                                    &from_node,
1067                                                    &sink_port,
1068                                                    *to_many,
1069                                                    NetworkHint::Auto,
1070                                                )
1071                                            } else {
1072                                                Box::new(|| {}) as Box<dyn FnOnce()>
1073                                            },
1074                                        )
1075                                    }
1076                                }
1077                                LocationId::Cluster(cluster_key) => {
1078                                    let from_node = clusters
1079                                        .get(*cluster_key)
1080                                        .unwrap_or_else(|| {
1081                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1082                                        })
1083                                        .clone();
1084
1085                                    let sink_port = from_node.next_port();
1086                                    let source_port = to_node.next_port();
1087
1088                                    if *unpaired {
1089                                        to_node.register(*to_port_id, source_port.clone());
1090                                    }
1091
1092                                    (
1093                                        (
1094                                            D::m2e_sink(
1095                                                &from_node,
1096                                                &sink_port,
1097                                                &to_node,
1098                                                &source_port,
1099                                                format!("{}_{}", *to_external_key, *to_port_id)
1100                                            ),
1101                                            parse_quote!(DUMMY),
1102                                        ),
1103                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1104                                    )
1105                                }
1106                                _ => panic!()
1107                            }
1108                        },
1109
1110                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1111                    };
1112
1113                    *instantiate_fn = DebugInstantiateFinalized {
1114                        sink: sink_expr,
1115                        source: source_expr,
1116                        connect_fn: Some(connect_fn),
1117                    }
1118                    .into();
1119                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1120                    let element_type = match &input.metadata().collection_kind {
1121                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1122                        _ => panic!("Embedded output must have Stream collection kind"),
1123                    };
1124                    let location_key = match input.metadata().location_id.root() {
1125                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1126                        _ => panic!("Embedded output must be on a process or cluster"),
1127                    };
1128                    D::register_embedded_output(
1129                        &mut refcell_env.borrow_mut(),
1130                        location_key,
1131                        ident,
1132                        &element_type,
1133                    );
1134                }
1135            },
1136            &mut |n| {
1137                if let HydroNode::Network {
1138                    name,
1139                    networking_info,
1140                    input,
1141                    instantiate_fn,
1142                    metadata,
1143                    ..
1144                } = n
1145                {
1146                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1147                        DebugInstantiate::Building => instantiate_network::<D>(
1148                            &mut refcell_env.borrow_mut(),
1149                            input.metadata().location_id.root(),
1150                            metadata.location_id.root(),
1151                            processes,
1152                            clusters,
1153                            name.as_deref(),
1154                            networking_info,
1155                        ),
1156
1157                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1158                    };
1159
1160                    *instantiate_fn = DebugInstantiateFinalized {
1161                        sink: sink_expr,
1162                        source: source_expr,
1163                        connect_fn: Some(connect_fn),
1164                    }
1165                    .into();
1166                } else if let HydroNode::ExternalInput {
1167                    from_external_key,
1168                    from_port_id,
1169                    from_many,
1170                    codec_type,
1171                    port_hint,
1172                    instantiate_fn,
1173                    metadata,
1174                    ..
1175                } = n
1176                {
1177                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1178                        DebugInstantiate::Building => {
1179                            let from_node = externals
1180                                .get(*from_external_key)
1181                                .unwrap_or_else(|| {
1182                                    panic!(
1183                                        "A external used in the graph was not instantiated: {}",
1184                                        from_external_key,
1185                                    )
1186                                })
1187                                .clone();
1188
1189                            match metadata.location_id.root() {
1190                                &LocationId::Process(process_key) => {
1191                                    let to_node = processes
1192                                        .get(process_key)
1193                                        .unwrap_or_else(|| {
1194                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1195                                        })
1196                                        .clone();
1197
1198                                    let sink_port = from_node.next_port();
1199                                    let source_port = to_node.next_port();
1200
1201                                    from_node.register(*from_port_id, sink_port.clone());
1202
1203                                    (
1204                                        (
1205                                            parse_quote!(DUMMY),
1206                                            if *from_many {
1207                                                D::e2o_many_source(
1208                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1209                                                    &to_node, &source_port,
1210                                                    codec_type.0.as_ref(),
1211                                                    format!("{}_{}", *from_external_key, *from_port_id)
1212                                                )
1213                                            } else {
1214                                                D::e2o_source(
1215                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1216                                                    &from_node, &sink_port,
1217                                                    &to_node, &source_port,
1218                                                    codec_type.0.as_ref(),
1219                                                    format!("{}_{}", *from_external_key, *from_port_id)
1220                                                )
1221                                            },
1222                                        ),
1223                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1224                                    )
1225                                }
1226                                LocationId::Cluster(cluster_key) => {
1227                                    let to_node = clusters
1228                                        .get(*cluster_key)
1229                                        .unwrap_or_else(|| {
1230                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1231                                        })
1232                                        .clone();
1233
1234                                    let sink_port = from_node.next_port();
1235                                    let source_port = to_node.next_port();
1236
1237                                    from_node.register(*from_port_id, sink_port.clone());
1238
1239                                    (
1240                                        (
1241                                            parse_quote!(DUMMY),
1242                                            D::e2m_source(
1243                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1244                                                &from_node, &sink_port,
1245                                                &to_node, &source_port,
1246                                                codec_type.0.as_ref(),
1247                                                format!("{}_{}", *from_external_key, *from_port_id)
1248                                            ),
1249                                        ),
1250                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1251                                    )
1252                                }
1253                                _ => panic!()
1254                            }
1255                        },
1256
1257                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1258                    };
1259
1260                    *instantiate_fn = DebugInstantiateFinalized {
1261                        sink: sink_expr,
1262                        source: source_expr,
1263                        connect_fn: Some(connect_fn),
1264                    }
1265                    .into();
1266                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1267                    let element_type = match &metadata.collection_kind {
1268                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1269                        _ => panic!("Embedded source must have Stream collection kind"),
1270                    };
1271                    let location_key = match metadata.location_id.root() {
1272                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1273                        _ => panic!("Embedded source must be on a process or cluster"),
1274                    };
1275                    D::register_embedded_stream_input(
1276                        &mut refcell_env.borrow_mut(),
1277                        location_key,
1278                        ident,
1279                        &element_type,
1280                    );
1281                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1282                    let element_type = match &metadata.collection_kind {
1283                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1284                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1285                    };
1286                    let location_key = match metadata.location_id.root() {
1287                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1288                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1289                    };
1290                    D::register_embedded_singleton_input(
1291                        &mut refcell_env.borrow_mut(),
1292                        location_key,
1293                        ident,
1294                        &element_type,
1295                    );
1296                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1297                    match state {
1298                        ClusterMembersState::Uninit => {
1299                            let at_location = metadata.location_id.root().clone();
1300                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1301                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1302                                // First occurrence: call cluster_membership_stream and mark as Stream.
1303                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1304                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1305                                    &(),
1306                                );
1307                                *state = ClusterMembersState::Stream(expr.into());
1308                            } else {
1309                                // Already instantiated for this (at, target) pair: just tee.
1310                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1311                            }
1312                        }
1313                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1314                            panic!("cluster members already finalized");
1315                        }
1316                    }
1317                }
1318            },
1319            seen_tees,
1320            false,
1321        );
1322    }
1323
1324    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1325        self.transform_bottom_up(
1326            &mut |l| {
1327                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1328                    match instantiate_fn {
1329                        DebugInstantiate::Building => panic!("network not built"),
1330
1331                        DebugInstantiate::Finalized(finalized) => {
1332                            (finalized.connect_fn.take().unwrap())();
1333                        }
1334                    }
1335                }
1336            },
1337            &mut |n| {
1338                if let HydroNode::Network { instantiate_fn, .. }
1339                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1340                {
1341                    match instantiate_fn {
1342                        DebugInstantiate::Building => panic!("network not built"),
1343
1344                        DebugInstantiate::Finalized(finalized) => {
1345                            (finalized.connect_fn.take().unwrap())();
1346                        }
1347                    }
1348                }
1349            },
1350            seen_tees,
1351            false,
1352        );
1353    }
1354
1355    pub fn transform_bottom_up(
1356        &mut self,
1357        transform_root: &mut impl FnMut(&mut HydroRoot),
1358        transform_node: &mut impl FnMut(&mut HydroNode),
1359        seen_tees: &mut SeenSharedNodes,
1360        check_well_formed: bool,
1361    ) {
1362        self.transform_children(
1363            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1364            seen_tees,
1365        );
1366
1367        transform_root(self);
1368    }
1369
1370    pub fn transform_children(
1371        &mut self,
1372        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1373        seen_tees: &mut SeenSharedNodes,
1374    ) {
1375        match self {
1376            HydroRoot::ForEach { input, .. }
1377            | HydroRoot::SendExternal { input, .. }
1378            | HydroRoot::DestSink { input, .. }
1379            | HydroRoot::CycleSink { input, .. }
1380            | HydroRoot::EmbeddedOutput { input, .. }
1381            | HydroRoot::Null { input, .. } => {
1382                transform(input, seen_tees);
1383            }
1384        }
1385    }
1386
1387    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388        match self {
1389            HydroRoot::ForEach {
1390                f,
1391                input,
1392                op_metadata,
1393            } => HydroRoot::ForEach {
1394                f: f.clone(),
1395                input: Box::new(input.deep_clone(seen_tees)),
1396                op_metadata: op_metadata.clone(),
1397            },
1398            HydroRoot::SendExternal {
1399                to_external_key,
1400                to_port_id,
1401                to_many,
1402                unpaired,
1403                serialize_fn,
1404                instantiate_fn,
1405                input,
1406                op_metadata,
1407            } => HydroRoot::SendExternal {
1408                to_external_key: *to_external_key,
1409                to_port_id: *to_port_id,
1410                to_many: *to_many,
1411                unpaired: *unpaired,
1412                serialize_fn: serialize_fn.clone(),
1413                instantiate_fn: instantiate_fn.clone(),
1414                input: Box::new(input.deep_clone(seen_tees)),
1415                op_metadata: op_metadata.clone(),
1416            },
1417            HydroRoot::DestSink {
1418                sink,
1419                input,
1420                op_metadata,
1421            } => HydroRoot::DestSink {
1422                sink: sink.clone(),
1423                input: Box::new(input.deep_clone(seen_tees)),
1424                op_metadata: op_metadata.clone(),
1425            },
1426            HydroRoot::CycleSink {
1427                cycle_id,
1428                input,
1429                op_metadata,
1430            } => HydroRoot::CycleSink {
1431                cycle_id: *cycle_id,
1432                input: Box::new(input.deep_clone(seen_tees)),
1433                op_metadata: op_metadata.clone(),
1434            },
1435            HydroRoot::EmbeddedOutput {
1436                ident,
1437                input,
1438                op_metadata,
1439            } => HydroRoot::EmbeddedOutput {
1440                ident: ident.clone(),
1441                input: Box::new(input.deep_clone(seen_tees)),
1442                op_metadata: op_metadata.clone(),
1443            },
1444            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445                input: Box::new(input.deep_clone(seen_tees)),
1446                op_metadata: op_metadata.clone(),
1447            },
1448        }
1449    }
1450
1451    #[cfg(feature = "build")]
1452    pub fn emit(
1453        &mut self,
1454        graph_builders: &mut dyn DfirBuilder,
1455        seen_tees: &mut SeenSharedNodes,
1456        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457        next_stmt_id: &mut usize,
1458        fold_hooked_idents: &mut HashSet<String>,
1459    ) {
1460        self.emit_core(
1461            &mut BuildersOrCallback::<
1462                fn(&mut HydroRoot, &mut usize),
1463                fn(&mut HydroNode, &mut usize),
1464            >::Builders(graph_builders),
1465            seen_tees,
1466            built_tees,
1467            next_stmt_id,
1468            fold_hooked_idents,
1469        );
1470    }
1471
1472    #[cfg(feature = "build")]
1473    pub fn emit_core(
1474        &mut self,
1475        builders_or_callback: &mut BuildersOrCallback<
1476            impl FnMut(&mut HydroRoot, &mut usize),
1477            impl FnMut(&mut HydroNode, &mut usize),
1478        >,
1479        seen_tees: &mut SeenSharedNodes,
1480        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481        next_stmt_id: &mut usize,
1482        fold_hooked_idents: &mut HashSet<String>,
1483    ) {
1484        match self {
1485            HydroRoot::ForEach { f, input, .. } => {
1486                let input_ident = input.emit_core(
1487                    builders_or_callback,
1488                    seen_tees,
1489                    built_tees,
1490                    next_stmt_id,
1491                    fold_hooked_idents,
1492                );
1493
1494                match builders_or_callback {
1495                    BuildersOrCallback::Builders(graph_builders) => {
1496                        graph_builders
1497                            .get_dfir_mut(&input.metadata().location_id)
1498                            .add_dfir(
1499                                parse_quote! {
1500                                    #input_ident -> for_each(#f);
1501                                },
1502                                None,
1503                                Some(&next_stmt_id.to_string()),
1504                            );
1505                    }
1506                    BuildersOrCallback::Callback(leaf_callback, _) => {
1507                        leaf_callback(self, next_stmt_id);
1508                    }
1509                }
1510
1511                *next_stmt_id += 1;
1512            }
1513
1514            HydroRoot::SendExternal {
1515                serialize_fn,
1516                instantiate_fn,
1517                input,
1518                ..
1519            } => {
1520                let input_ident = input.emit_core(
1521                    builders_or_callback,
1522                    seen_tees,
1523                    built_tees,
1524                    next_stmt_id,
1525                    fold_hooked_idents,
1526                );
1527
1528                match builders_or_callback {
1529                    BuildersOrCallback::Builders(graph_builders) => {
1530                        let (sink_expr, _) = match instantiate_fn {
1531                            DebugInstantiate::Building => (
1532                                syn::parse_quote!(DUMMY_SINK),
1533                                syn::parse_quote!(DUMMY_SOURCE),
1534                            ),
1535
1536                            DebugInstantiate::Finalized(finalized) => {
1537                                (finalized.sink.clone(), finalized.source.clone())
1538                            }
1539                        };
1540
1541                        graph_builders.create_external_output(
1542                            &input.metadata().location_id,
1543                            sink_expr,
1544                            &input_ident,
1545                            serialize_fn.as_ref(),
1546                            *next_stmt_id,
1547                        );
1548                    }
1549                    BuildersOrCallback::Callback(leaf_callback, _) => {
1550                        leaf_callback(self, next_stmt_id);
1551                    }
1552                }
1553
1554                *next_stmt_id += 1;
1555            }
1556
1557            HydroRoot::DestSink { sink, input, .. } => {
1558                let input_ident = input.emit_core(
1559                    builders_or_callback,
1560                    seen_tees,
1561                    built_tees,
1562                    next_stmt_id,
1563                    fold_hooked_idents,
1564                );
1565
1566                match builders_or_callback {
1567                    BuildersOrCallback::Builders(graph_builders) => {
1568                        graph_builders
1569                            .get_dfir_mut(&input.metadata().location_id)
1570                            .add_dfir(
1571                                parse_quote! {
1572                                    #input_ident -> dest_sink(#sink);
1573                                },
1574                                None,
1575                                Some(&next_stmt_id.to_string()),
1576                            );
1577                    }
1578                    BuildersOrCallback::Callback(leaf_callback, _) => {
1579                        leaf_callback(self, next_stmt_id);
1580                    }
1581                }
1582
1583                *next_stmt_id += 1;
1584            }
1585
1586            HydroRoot::CycleSink {
1587                cycle_id, input, ..
1588            } => {
1589                let input_ident = input.emit_core(
1590                    builders_or_callback,
1591                    seen_tees,
1592                    built_tees,
1593                    next_stmt_id,
1594                    fold_hooked_idents,
1595                );
1596
1597                match builders_or_callback {
1598                    BuildersOrCallback::Builders(graph_builders) => {
1599                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1600                            CollectionKind::KeyedSingleton {
1601                                key_type,
1602                                value_type,
1603                                ..
1604                            }
1605                            | CollectionKind::KeyedStream {
1606                                key_type,
1607                                value_type,
1608                                ..
1609                            } => {
1610                                parse_quote!((#key_type, #value_type))
1611                            }
1612                            CollectionKind::Stream { element_type, .. }
1613                            | CollectionKind::Singleton { element_type, .. }
1614                            | CollectionKind::Optional { element_type, .. } => {
1615                                parse_quote!(#element_type)
1616                            }
1617                        };
1618
1619                        let cycle_id_ident = cycle_id.as_ident();
1620                        graph_builders
1621                            .get_dfir_mut(&input.metadata().location_id)
1622                            .add_dfir(
1623                                parse_quote! {
1624                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1625                                },
1626                                None,
1627                                None,
1628                            );
1629                    }
1630                    // No ID, no callback
1631                    BuildersOrCallback::Callback(_, _) => {}
1632                }
1633            }
1634
1635            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1636                let input_ident = input.emit_core(
1637                    builders_or_callback,
1638                    seen_tees,
1639                    built_tees,
1640                    next_stmt_id,
1641                    fold_hooked_idents,
1642                );
1643
1644                match builders_or_callback {
1645                    BuildersOrCallback::Builders(graph_builders) => {
1646                        graph_builders
1647                            .get_dfir_mut(&input.metadata().location_id)
1648                            .add_dfir(
1649                                parse_quote! {
1650                                    #input_ident -> for_each(&mut #ident);
1651                                },
1652                                None,
1653                                Some(&next_stmt_id.to_string()),
1654                            );
1655                    }
1656                    BuildersOrCallback::Callback(leaf_callback, _) => {
1657                        leaf_callback(self, next_stmt_id);
1658                    }
1659                }
1660
1661                *next_stmt_id += 1;
1662            }
1663
1664            HydroRoot::Null { input, .. } => {
1665                let input_ident = input.emit_core(
1666                    builders_or_callback,
1667                    seen_tees,
1668                    built_tees,
1669                    next_stmt_id,
1670                    fold_hooked_idents,
1671                );
1672
1673                match builders_or_callback {
1674                    BuildersOrCallback::Builders(graph_builders) => {
1675                        graph_builders
1676                            .get_dfir_mut(&input.metadata().location_id)
1677                            .add_dfir(
1678                                parse_quote! {
1679                                    #input_ident -> for_each(|_| {});
1680                                },
1681                                None,
1682                                Some(&next_stmt_id.to_string()),
1683                            );
1684                    }
1685                    BuildersOrCallback::Callback(leaf_callback, _) => {
1686                        leaf_callback(self, next_stmt_id);
1687                    }
1688                }
1689
1690                *next_stmt_id += 1;
1691            }
1692        }
1693    }
1694
1695    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1696        match self {
1697            HydroRoot::ForEach { op_metadata, .. }
1698            | HydroRoot::SendExternal { op_metadata, .. }
1699            | HydroRoot::DestSink { op_metadata, .. }
1700            | HydroRoot::CycleSink { op_metadata, .. }
1701            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1702            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1703        }
1704    }
1705
1706    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1707        match self {
1708            HydroRoot::ForEach { op_metadata, .. }
1709            | HydroRoot::SendExternal { op_metadata, .. }
1710            | HydroRoot::DestSink { op_metadata, .. }
1711            | HydroRoot::CycleSink { op_metadata, .. }
1712            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1713            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1714        }
1715    }
1716
1717    pub fn input(&self) -> &HydroNode {
1718        match self {
1719            HydroRoot::ForEach { input, .. }
1720            | HydroRoot::SendExternal { input, .. }
1721            | HydroRoot::DestSink { input, .. }
1722            | HydroRoot::CycleSink { input, .. }
1723            | HydroRoot::EmbeddedOutput { input, .. }
1724            | HydroRoot::Null { input, .. } => input,
1725        }
1726    }
1727
1728    pub fn input_metadata(&self) -> &HydroIrMetadata {
1729        self.input().metadata()
1730    }
1731
1732    pub fn print_root(&self) -> String {
1733        match self {
1734            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1735            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1736            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1737            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1738            HydroRoot::EmbeddedOutput { ident, .. } => {
1739                format!("EmbeddedOutput({})", ident)
1740            }
1741            HydroRoot::Null { .. } => "Null".to_owned(),
1742        }
1743    }
1744
1745    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1746        match self {
1747            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1748                transform(f);
1749            }
1750            HydroRoot::SendExternal { .. }
1751            | HydroRoot::CycleSink { .. }
1752            | HydroRoot::EmbeddedOutput { .. }
1753            | HydroRoot::Null { .. } => {}
1754        }
1755    }
1756}
1757
1758#[cfg(feature = "build")]
1759fn tick_of(loc: &LocationId) -> Option<ClockId> {
1760    match loc {
1761        LocationId::Tick(id, _) => Some(*id),
1762        LocationId::Atomic(inner) => tick_of(inner),
1763        _ => None,
1764    }
1765}
1766
1767#[cfg(feature = "build")]
1768fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1769    match loc {
1770        LocationId::Tick(id, inner) => {
1771            *id = uf_find(uf, *id);
1772            remap_location(inner, uf);
1773        }
1774        LocationId::Atomic(inner) => {
1775            remap_location(inner, uf);
1776        }
1777        LocationId::Process(_) | LocationId::Cluster(_) => {}
1778    }
1779}
1780
1781#[cfg(feature = "build")]
1782fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1783    let p = *parent.get(&x).unwrap_or(&x);
1784    if p == x {
1785        return x;
1786    }
1787    let root = uf_find(parent, p);
1788    parent.insert(x, root);
1789    root
1790}
1791
1792#[cfg(feature = "build")]
1793fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1794    let ra = uf_find(parent, a);
1795    let rb = uf_find(parent, b);
1796    if ra != rb {
1797        parent.insert(ra, rb);
1798    }
1799}
1800
1801/// Traverse the IR to build a union-find that unifies tick IDs connected
1802/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1803/// rewrite all `LocationId`s to use the representative tick ID.
1804#[cfg(feature = "build")]
1805pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1806    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1807
1808    // Pass 1: collect unifications.
1809    transform_bottom_up(
1810        ir,
1811        &mut |_| {},
1812        &mut |node: &mut HydroNode| {
1813            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1814                node
1815                && let (Some(a), Some(b)) = (
1816                    tick_of(&inner.metadata().location_id),
1817                    tick_of(&metadata.location_id),
1818                )
1819            {
1820                uf_union(&mut uf, a, b);
1821            }
1822        },
1823        false,
1824    );
1825
1826    // Pass 2: rewrite all LocationIds.
1827    transform_bottom_up(
1828        ir,
1829        &mut |_| {},
1830        &mut |node: &mut HydroNode| {
1831            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1832        },
1833        false,
1834    );
1835}
1836
1837#[cfg(feature = "build")]
1838pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1839    let mut builders = SecondaryMap::new();
1840    let mut seen_tees = HashMap::new();
1841    let mut built_tees = HashMap::new();
1842    let mut next_stmt_id = 0;
1843    let mut fold_hooked_idents = HashSet::new();
1844    for leaf in ir {
1845        leaf.emit(
1846            &mut builders,
1847            &mut seen_tees,
1848            &mut built_tees,
1849            &mut next_stmt_id,
1850            &mut fold_hooked_idents,
1851        );
1852    }
1853    builders
1854}
1855
1856#[cfg(feature = "build")]
1857pub fn traverse_dfir(
1858    ir: &mut [HydroRoot],
1859    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1860    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1861) {
1862    let mut seen_tees = HashMap::new();
1863    let mut built_tees = HashMap::new();
1864    let mut next_stmt_id = 0;
1865    let mut fold_hooked_idents = HashSet::new();
1866    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1867    ir.iter_mut().for_each(|leaf| {
1868        leaf.emit_core(
1869            &mut callback,
1870            &mut seen_tees,
1871            &mut built_tees,
1872            &mut next_stmt_id,
1873            &mut fold_hooked_idents,
1874        );
1875    });
1876}
1877
1878pub fn transform_bottom_up(
1879    ir: &mut [HydroRoot],
1880    transform_root: &mut impl FnMut(&mut HydroRoot),
1881    transform_node: &mut impl FnMut(&mut HydroNode),
1882    check_well_formed: bool,
1883) {
1884    let mut seen_tees = HashMap::new();
1885    ir.iter_mut().for_each(|leaf| {
1886        leaf.transform_bottom_up(
1887            transform_root,
1888            transform_node,
1889            &mut seen_tees,
1890            check_well_formed,
1891        );
1892    });
1893}
1894
1895pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1896    let mut seen_tees = HashMap::new();
1897    ir.iter()
1898        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1899        .collect()
1900}
1901
1902type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1903thread_local! {
1904    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1905    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1906    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1907    /// on subsequent encounters, preventing infinite loops.
1908    static SERIALIZED_SHARED: PrintedTees
1909        = const { RefCell::new(None) };
1910}
1911
1912pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1913    PRINTED_TEES.with(|printed_tees| {
1914        let mut printed_tees_mut = printed_tees.borrow_mut();
1915        *printed_tees_mut = Some((0, HashMap::new()));
1916        drop(printed_tees_mut);
1917
1918        let ret = f();
1919
1920        let mut printed_tees_mut = printed_tees.borrow_mut();
1921        *printed_tees_mut = None;
1922
1923        ret
1924    })
1925}
1926
1927/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1928/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1929/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1930/// back-reference.  The tracking state is restored when `f` returns or panics.
1931pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1932    let _guard = SerializedSharedGuard::enter();
1933    f()
1934}
1935
1936/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1937/// making `serialize_dedup_shared` re-entrant and panic-safe.
1938struct SerializedSharedGuard {
1939    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1940}
1941
1942impl SerializedSharedGuard {
1943    fn enter() -> Self {
1944        let previous = SERIALIZED_SHARED.with(|cell| {
1945            let mut guard = cell.borrow_mut();
1946            guard.replace((0, HashMap::new()))
1947        });
1948        Self { previous }
1949    }
1950}
1951
1952impl Drop for SerializedSharedGuard {
1953    fn drop(&mut self) {
1954        SERIALIZED_SHARED.with(|cell| {
1955            *cell.borrow_mut() = self.previous.take();
1956        });
1957    }
1958}
1959
1960pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1961
1962impl serde::Serialize for SharedNode {
1963    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1964    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1965    /// same subtree every time and, if the graph ever contains a cycle, loop
1966    /// forever.
1967    ///
1968    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1969    /// integer id.  The first time we see a pointer we assign it the next id and
1970    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1971    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1972    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1973    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1974        SERIALIZED_SHARED.with(|cell| {
1975            let mut guard = cell.borrow_mut();
1976            // (next_id, pointer → assigned_id)
1977            let state = guard.as_mut().ok_or_else(|| {
1978                serde::ser::Error::custom(
1979                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1980                )
1981            })?;
1982            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1983
1984            if let Some(&id) = state.1.get(&ptr) {
1985                drop(guard);
1986                use serde::ser::SerializeMap;
1987                let mut map = serializer.serialize_map(Some(1))?;
1988                map.serialize_entry("$shared_ref", &id)?;
1989                map.end()
1990            } else {
1991                let id = state.0;
1992                state.0 += 1;
1993                state.1.insert(ptr, id);
1994                drop(guard);
1995
1996                use serde::ser::SerializeMap;
1997                let mut map = serializer.serialize_map(Some(2))?;
1998                map.serialize_entry("$shared", &id)?;
1999                map.serialize_entry("node", &*self.0.borrow())?;
2000                map.end()
2001            }
2002        })
2003    }
2004}
2005
2006impl SharedNode {
2007    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2008        Rc::as_ptr(&self.0)
2009    }
2010}
2011
2012impl Debug for SharedNode {
2013    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2014        PRINTED_TEES.with(|printed_tees| {
2015            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2016            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2017
2018            if let Some(printed_tees_mut) = printed_tees_mut {
2019                if let Some(existing) = printed_tees_mut
2020                    .1
2021                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2022                {
2023                    write!(f, "<shared {}>", existing)
2024                } else {
2025                    let next_id = printed_tees_mut.0;
2026                    printed_tees_mut.0 += 1;
2027                    printed_tees_mut
2028                        .1
2029                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2030                    drop(printed_tees_mut_borrow);
2031                    write!(f, "<shared {}>: ", next_id)?;
2032                    Debug::fmt(&self.0.borrow(), f)
2033                }
2034            } else {
2035                drop(printed_tees_mut_borrow);
2036                write!(f, "<shared>: ")?;
2037                Debug::fmt(&self.0.borrow(), f)
2038            }
2039        })
2040    }
2041}
2042
2043impl Hash for SharedNode {
2044    fn hash<H: Hasher>(&self, state: &mut H) {
2045        self.0.borrow_mut().hash(state);
2046    }
2047}
2048
2049#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2050pub enum BoundKind {
2051    Unbounded,
2052    Bounded,
2053}
2054
2055#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2056pub enum StreamOrder {
2057    NoOrder,
2058    TotalOrder,
2059}
2060
2061#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2062pub enum StreamRetry {
2063    AtLeastOnce,
2064    ExactlyOnce,
2065}
2066
2067#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2068pub enum KeyedSingletonBoundKind {
2069    Unbounded,
2070    MonotonicValue,
2071    BoundedValue,
2072    Bounded,
2073}
2074
2075#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2076pub enum SingletonBoundKind {
2077    Unbounded,
2078    Monotonic,
2079    Bounded,
2080}
2081
2082#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2083pub enum CollectionKind {
2084    Stream {
2085        bound: BoundKind,
2086        order: StreamOrder,
2087        retry: StreamRetry,
2088        element_type: DebugType,
2089    },
2090    Singleton {
2091        bound: SingletonBoundKind,
2092        element_type: DebugType,
2093    },
2094    Optional {
2095        bound: BoundKind,
2096        element_type: DebugType,
2097    },
2098    KeyedStream {
2099        bound: BoundKind,
2100        value_order: StreamOrder,
2101        value_retry: StreamRetry,
2102        key_type: DebugType,
2103        value_type: DebugType,
2104    },
2105    KeyedSingleton {
2106        bound: KeyedSingletonBoundKind,
2107        key_type: DebugType,
2108        value_type: DebugType,
2109    },
2110}
2111
2112impl CollectionKind {
2113    pub fn is_bounded(&self) -> bool {
2114        matches!(
2115            self,
2116            CollectionKind::Stream {
2117                bound: BoundKind::Bounded,
2118                ..
2119            } | CollectionKind::Singleton {
2120                bound: SingletonBoundKind::Bounded,
2121                ..
2122            } | CollectionKind::Optional {
2123                bound: BoundKind::Bounded,
2124                ..
2125            } | CollectionKind::KeyedStream {
2126                bound: BoundKind::Bounded,
2127                ..
2128            } | CollectionKind::KeyedSingleton {
2129                bound: KeyedSingletonBoundKind::Bounded,
2130                ..
2131            }
2132        )
2133    }
2134}
2135
2136#[derive(Clone, serde::Serialize)]
2137pub struct HydroIrMetadata {
2138    pub location_id: LocationId,
2139    pub collection_kind: CollectionKind,
2140    pub cardinality: Option<usize>,
2141    pub tag: Option<String>,
2142    pub op: HydroIrOpMetadata,
2143}
2144
2145// HydroIrMetadata shouldn't be used to hash or compare
2146impl Hash for HydroIrMetadata {
2147    fn hash<H: Hasher>(&self, _: &mut H) {}
2148}
2149
2150impl PartialEq for HydroIrMetadata {
2151    fn eq(&self, _: &Self) -> bool {
2152        true
2153    }
2154}
2155
2156impl Eq for HydroIrMetadata {}
2157
2158impl Debug for HydroIrMetadata {
2159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2160        f.debug_struct("HydroIrMetadata")
2161            .field("location_id", &self.location_id)
2162            .field("collection_kind", &self.collection_kind)
2163            .finish()
2164    }
2165}
2166
2167/// Metadata that is specific to the operator itself, rather than its outputs.
2168/// This is available on _both_ inner nodes and roots.
2169#[derive(Clone, serde::Serialize)]
2170pub struct HydroIrOpMetadata {
2171    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2172    pub backtrace: Backtrace,
2173    pub cpu_usage: Option<f64>,
2174    pub network_recv_cpu_usage: Option<f64>,
2175    pub id: Option<usize>,
2176}
2177
2178impl HydroIrOpMetadata {
2179    #[expect(
2180        clippy::new_without_default,
2181        reason = "explicit calls to new ensure correct backtrace bounds"
2182    )]
2183    pub fn new() -> HydroIrOpMetadata {
2184        Self::new_with_skip(1)
2185    }
2186
2187    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2188        HydroIrOpMetadata {
2189            backtrace: Backtrace::get_backtrace(2 + skip_count),
2190            cpu_usage: None,
2191            network_recv_cpu_usage: None,
2192            id: None,
2193        }
2194    }
2195}
2196
2197impl Debug for HydroIrOpMetadata {
2198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2199        f.debug_struct("HydroIrOpMetadata").finish()
2200    }
2201}
2202
2203impl Hash for HydroIrOpMetadata {
2204    fn hash<H: Hasher>(&self, _: &mut H) {}
2205}
2206
2207/// An intermediate node in a Hydro graph, which consumes data
2208/// from upstream nodes and emits data to downstream nodes.
2209#[derive(Debug, Hash, serde::Serialize)]
2210pub enum HydroNode {
2211    Placeholder,
2212
2213    /// Manually "casts" between two different collection kinds.
2214    ///
2215    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2216    /// correctness checks. In particular, the user must ensure that every possible
2217    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2218    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2219    /// collection. This ensures that the simulator does not miss any possible outputs.
2220    Cast {
2221        inner: Box<HydroNode>,
2222        metadata: HydroIrMetadata,
2223    },
2224
2225    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2226    /// interpretation of the input stream.
2227    ///
2228    /// In production, this simply passes through the input, but in simulation, this operator
2229    /// explicitly selects a randomized interpretation.
2230    ObserveNonDet {
2231        inner: Box<HydroNode>,
2232        trusted: bool, // if true, we do not need to simulate non-determinism
2233        metadata: HydroIrMetadata,
2234    },
2235
2236    Source {
2237        source: HydroSource,
2238        metadata: HydroIrMetadata,
2239    },
2240
2241    SingletonSource {
2242        value: DebugExpr,
2243        first_tick_only: bool,
2244        metadata: HydroIrMetadata,
2245    },
2246
2247    CycleSource {
2248        cycle_id: CycleId,
2249        metadata: HydroIrMetadata,
2250    },
2251
2252    Tee {
2253        inner: SharedNode,
2254        metadata: HydroIrMetadata,
2255    },
2256
2257    /// A singleton materialization point. Wraps a SharedNode so that:
2258    /// - The pipe output delivers the single item to one consumer
2259    /// - `#var` references can borrow the value from the singleton slot
2260    ///
2261    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2262    ///
2263    /// Uses the same `built_tees` dedup pattern as `Tee`.
2264    Singleton {
2265        inner: SharedNode,
2266        metadata: HydroIrMetadata,
2267    },
2268
2269    Partition {
2270        inner: SharedNode,
2271        f: ClosureExpr,
2272        is_true: bool,
2273        metadata: HydroIrMetadata,
2274    },
2275
2276    BeginAtomic {
2277        inner: Box<HydroNode>,
2278        metadata: HydroIrMetadata,
2279    },
2280
2281    EndAtomic {
2282        inner: Box<HydroNode>,
2283        metadata: HydroIrMetadata,
2284    },
2285
2286    Batch {
2287        inner: Box<HydroNode>,
2288        metadata: HydroIrMetadata,
2289    },
2290
2291    YieldConcat {
2292        inner: Box<HydroNode>,
2293        metadata: HydroIrMetadata,
2294    },
2295
2296    Chain {
2297        first: Box<HydroNode>,
2298        second: Box<HydroNode>,
2299        metadata: HydroIrMetadata,
2300    },
2301
2302    MergeOrdered {
2303        first: Box<HydroNode>,
2304        second: Box<HydroNode>,
2305        metadata: HydroIrMetadata,
2306    },
2307
2308    ChainFirst {
2309        first: Box<HydroNode>,
2310        second: Box<HydroNode>,
2311        metadata: HydroIrMetadata,
2312    },
2313
2314    CrossProduct {
2315        left: Box<HydroNode>,
2316        right: Box<HydroNode>,
2317        metadata: HydroIrMetadata,
2318    },
2319
2320    CrossSingleton {
2321        left: Box<HydroNode>,
2322        right: Box<HydroNode>,
2323        metadata: HydroIrMetadata,
2324    },
2325
2326    Join {
2327        left: Box<HydroNode>,
2328        right: Box<HydroNode>,
2329        metadata: HydroIrMetadata,
2330    },
2331
2332    /// Asymmetric join where the right (build) side is bounded.
2333    /// The build side is accumulated (stratum-delayed) into a hash table,
2334    /// then the left (probe) side streams through preserving its ordering.
2335    JoinHalf {
2336        left: Box<HydroNode>,
2337        right: Box<HydroNode>,
2338        metadata: HydroIrMetadata,
2339    },
2340
2341    Difference {
2342        pos: Box<HydroNode>,
2343        neg: Box<HydroNode>,
2344        metadata: HydroIrMetadata,
2345    },
2346
2347    AntiJoin {
2348        pos: Box<HydroNode>,
2349        neg: Box<HydroNode>,
2350        metadata: HydroIrMetadata,
2351    },
2352
2353    ResolveFutures {
2354        input: Box<HydroNode>,
2355        metadata: HydroIrMetadata,
2356    },
2357    ResolveFuturesBlocking {
2358        input: Box<HydroNode>,
2359        metadata: HydroIrMetadata,
2360    },
2361    ResolveFuturesOrdered {
2362        input: Box<HydroNode>,
2363        metadata: HydroIrMetadata,
2364    },
2365
2366    Map {
2367        f: ClosureExpr,
2368        input: Box<HydroNode>,
2369        metadata: HydroIrMetadata,
2370    },
2371    FlatMap {
2372        f: ClosureExpr,
2373        input: Box<HydroNode>,
2374        metadata: HydroIrMetadata,
2375    },
2376    FlatMapStreamBlocking {
2377        f: ClosureExpr,
2378        input: Box<HydroNode>,
2379        metadata: HydroIrMetadata,
2380    },
2381    Filter {
2382        f: ClosureExpr,
2383        input: Box<HydroNode>,
2384        metadata: HydroIrMetadata,
2385    },
2386    FilterMap {
2387        f: ClosureExpr,
2388        input: Box<HydroNode>,
2389        metadata: HydroIrMetadata,
2390    },
2391
2392    DeferTick {
2393        input: Box<HydroNode>,
2394        metadata: HydroIrMetadata,
2395    },
2396    Enumerate {
2397        input: Box<HydroNode>,
2398        metadata: HydroIrMetadata,
2399    },
2400    Inspect {
2401        f: ClosureExpr,
2402        input: Box<HydroNode>,
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    Unique {
2407        input: Box<HydroNode>,
2408        metadata: HydroIrMetadata,
2409    },
2410
2411    Sort {
2412        input: Box<HydroNode>,
2413        metadata: HydroIrMetadata,
2414    },
2415    Fold {
2416        init: ClosureExpr,
2417        acc: ClosureExpr,
2418        input: Box<HydroNode>,
2419        metadata: HydroIrMetadata,
2420    },
2421
2422    Scan {
2423        init: ClosureExpr,
2424        acc: ClosureExpr,
2425        input: Box<HydroNode>,
2426        metadata: HydroIrMetadata,
2427    },
2428    ScanAsyncBlocking {
2429        init: ClosureExpr,
2430        acc: ClosureExpr,
2431        input: Box<HydroNode>,
2432        metadata: HydroIrMetadata,
2433    },
2434    FoldKeyed {
2435        init: ClosureExpr,
2436        acc: ClosureExpr,
2437        input: Box<HydroNode>,
2438        metadata: HydroIrMetadata,
2439    },
2440
2441    Reduce {
2442        f: ClosureExpr,
2443        input: Box<HydroNode>,
2444        metadata: HydroIrMetadata,
2445    },
2446    ReduceKeyed {
2447        f: ClosureExpr,
2448        input: Box<HydroNode>,
2449        metadata: HydroIrMetadata,
2450    },
2451    ReduceKeyedWatermark {
2452        f: ClosureExpr,
2453        input: Box<HydroNode>,
2454        watermark: Box<HydroNode>,
2455        metadata: HydroIrMetadata,
2456    },
2457
2458    Network {
2459        name: Option<String>,
2460        networking_info: crate::networking::NetworkingInfo,
2461        serialize_fn: Option<DebugExpr>,
2462        instantiate_fn: DebugInstantiate,
2463        deserialize_fn: Option<DebugExpr>,
2464        input: Box<HydroNode>,
2465        metadata: HydroIrMetadata,
2466    },
2467
2468    ExternalInput {
2469        from_external_key: LocationKey,
2470        from_port_id: ExternalPortId,
2471        from_many: bool,
2472        codec_type: DebugType,
2473        #[serde(skip)]
2474        port_hint: NetworkHint,
2475        instantiate_fn: DebugInstantiate,
2476        deserialize_fn: Option<DebugExpr>,
2477        metadata: HydroIrMetadata,
2478    },
2479
2480    Counter {
2481        tag: String,
2482        duration: DebugExpr,
2483        prefix: String,
2484        input: Box<HydroNode>,
2485        metadata: HydroIrMetadata,
2486    },
2487}
2488
2489pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2490pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2491
2492impl HydroNode {
2493    pub fn transform_bottom_up(
2494        &mut self,
2495        transform: &mut impl FnMut(&mut HydroNode),
2496        seen_tees: &mut SeenSharedNodes,
2497        check_well_formed: bool,
2498    ) {
2499        self.transform_children(
2500            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2501            seen_tees,
2502        );
2503
2504        transform(self);
2505
2506        let self_location = self.metadata().location_id.root();
2507
2508        if check_well_formed {
2509            match &*self {
2510                HydroNode::Network { .. } => {}
2511                _ => {
2512                    self.input_metadata().iter().for_each(|i| {
2513                        if i.location_id.root() != self_location {
2514                            panic!(
2515                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2516                                i,
2517                                i.location_id.root(),
2518                                self,
2519                                self_location
2520                            )
2521                        }
2522                    });
2523                }
2524            }
2525        }
2526    }
2527
2528    #[inline(always)]
2529    pub fn transform_children(
2530        &mut self,
2531        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2532        seen_tees: &mut SeenSharedNodes,
2533    ) {
2534        match self {
2535            HydroNode::Placeholder => {
2536                panic!();
2537            }
2538
2539            HydroNode::Source { .. }
2540            | HydroNode::SingletonSource { .. }
2541            | HydroNode::CycleSource { .. }
2542            | HydroNode::ExternalInput { .. } => {}
2543
2544            HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2545                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2546                    *inner = SharedNode(transformed.clone());
2547                } else {
2548                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2549                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2550                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2551                    transform(&mut orig, seen_tees);
2552                    *transformed_cell.borrow_mut() = orig;
2553                    *inner = SharedNode(transformed_cell);
2554                }
2555            }
2556
2557            HydroNode::Partition { inner, f, .. } => {
2558                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2559                    *inner = SharedNode(transformed.clone());
2560                } else {
2561                    f.transform_children(&mut transform, seen_tees);
2562                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2563                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2564                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2565                    transform(&mut orig, seen_tees);
2566                    *transformed_cell.borrow_mut() = orig;
2567                    *inner = SharedNode(transformed_cell);
2568                }
2569            }
2570
2571            HydroNode::Cast { inner, .. }
2572            | HydroNode::ObserveNonDet { inner, .. }
2573            | HydroNode::BeginAtomic { inner, .. }
2574            | HydroNode::EndAtomic { inner, .. }
2575            | HydroNode::Batch { inner, .. }
2576            | HydroNode::YieldConcat { inner, .. } => {
2577                transform(inner.as_mut(), seen_tees);
2578            }
2579
2580            HydroNode::Chain { first, second, .. } => {
2581                transform(first.as_mut(), seen_tees);
2582                transform(second.as_mut(), seen_tees);
2583            }
2584
2585            HydroNode::MergeOrdered { first, second, .. } => {
2586                transform(first.as_mut(), seen_tees);
2587                transform(second.as_mut(), seen_tees);
2588            }
2589
2590            HydroNode::ChainFirst { first, second, .. } => {
2591                transform(first.as_mut(), seen_tees);
2592                transform(second.as_mut(), seen_tees);
2593            }
2594
2595            HydroNode::CrossSingleton { left, right, .. }
2596            | HydroNode::CrossProduct { left, right, .. }
2597            | HydroNode::Join { left, right, .. }
2598            | HydroNode::JoinHalf { left, right, .. } => {
2599                transform(left.as_mut(), seen_tees);
2600                transform(right.as_mut(), seen_tees);
2601            }
2602
2603            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2604                transform(pos.as_mut(), seen_tees);
2605                transform(neg.as_mut(), seen_tees);
2606            }
2607
2608            HydroNode::Map { f, input, .. } => {
2609                f.transform_children(&mut transform, seen_tees);
2610                transform(input.as_mut(), seen_tees);
2611            }
2612            HydroNode::FlatMap { f, input, .. }
2613            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2614            | HydroNode::Filter { f, input, .. }
2615            | HydroNode::FilterMap { f, input, .. }
2616            | HydroNode::Inspect { f, input, .. }
2617            | HydroNode::Reduce { f, input, .. }
2618            | HydroNode::ReduceKeyed { f, input, .. } => {
2619                f.transform_children(&mut transform, seen_tees);
2620                transform(input.as_mut(), seen_tees);
2621            }
2622            HydroNode::ReduceKeyedWatermark {
2623                f,
2624                input,
2625                watermark,
2626                ..
2627            } => {
2628                f.transform_children(&mut transform, seen_tees);
2629                transform(input.as_mut(), seen_tees);
2630                transform(watermark.as_mut(), seen_tees);
2631            }
2632            HydroNode::Fold {
2633                init, acc, input, ..
2634            }
2635            | HydroNode::Scan {
2636                init, acc, input, ..
2637            }
2638            | HydroNode::ScanAsyncBlocking {
2639                init, acc, input, ..
2640            }
2641            | HydroNode::FoldKeyed {
2642                init, acc, input, ..
2643            } => {
2644                init.transform_children(&mut transform, seen_tees);
2645                acc.transform_children(&mut transform, seen_tees);
2646                transform(input.as_mut(), seen_tees);
2647            }
2648            HydroNode::ResolveFutures { input, .. }
2649            | HydroNode::ResolveFuturesBlocking { input, .. }
2650            | HydroNode::ResolveFuturesOrdered { input, .. }
2651            | HydroNode::Sort { input, .. }
2652            | HydroNode::DeferTick { input, .. }
2653            | HydroNode::Enumerate { input, .. }
2654            | HydroNode::Unique { input, .. }
2655            | HydroNode::Network { input, .. }
2656            | HydroNode::Counter { input, .. } => {
2657                transform(input.as_mut(), seen_tees);
2658            }
2659        }
2660    }
2661
2662    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2663        match self {
2664            HydroNode::Placeholder => HydroNode::Placeholder,
2665            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2666                inner: Box::new(inner.deep_clone(seen_tees)),
2667                metadata: metadata.clone(),
2668            },
2669            HydroNode::ObserveNonDet {
2670                inner,
2671                trusted,
2672                metadata,
2673            } => HydroNode::ObserveNonDet {
2674                inner: Box::new(inner.deep_clone(seen_tees)),
2675                trusted: *trusted,
2676                metadata: metadata.clone(),
2677            },
2678            HydroNode::Source { source, metadata } => HydroNode::Source {
2679                source: source.clone(),
2680                metadata: metadata.clone(),
2681            },
2682            HydroNode::SingletonSource {
2683                value,
2684                first_tick_only,
2685                metadata,
2686            } => HydroNode::SingletonSource {
2687                value: value.clone(),
2688                first_tick_only: *first_tick_only,
2689                metadata: metadata.clone(),
2690            },
2691            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2692                cycle_id: *cycle_id,
2693                metadata: metadata.clone(),
2694            },
2695            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2696                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2697                    SharedNode(transformed.clone())
2698                } else {
2699                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2700                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2701                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2702                    *new_rc.borrow_mut() = cloned;
2703                    SharedNode(new_rc)
2704                };
2705                if matches!(self, HydroNode::Singleton { .. }) {
2706                    HydroNode::Singleton {
2707                        inner: cloned_inner,
2708                        metadata: metadata.clone(),
2709                    }
2710                } else {
2711                    HydroNode::Tee {
2712                        inner: cloned_inner,
2713                        metadata: metadata.clone(),
2714                    }
2715                }
2716            }
2717            HydroNode::Partition {
2718                inner,
2719                f,
2720                is_true,
2721                metadata,
2722            } => {
2723                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2724                    HydroNode::Partition {
2725                        inner: SharedNode(transformed.clone()),
2726                        f: f.deep_clone(seen_tees),
2727                        is_true: *is_true,
2728                        metadata: metadata.clone(),
2729                    }
2730                } else {
2731                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2732                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2733                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2734                    *new_rc.borrow_mut() = cloned;
2735                    HydroNode::Partition {
2736                        inner: SharedNode(new_rc),
2737                        f: f.deep_clone(seen_tees),
2738                        is_true: *is_true,
2739                        metadata: metadata.clone(),
2740                    }
2741                }
2742            }
2743            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2744                inner: Box::new(inner.deep_clone(seen_tees)),
2745                metadata: metadata.clone(),
2746            },
2747            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2748                inner: Box::new(inner.deep_clone(seen_tees)),
2749                metadata: metadata.clone(),
2750            },
2751            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2752                inner: Box::new(inner.deep_clone(seen_tees)),
2753                metadata: metadata.clone(),
2754            },
2755            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2756                inner: Box::new(inner.deep_clone(seen_tees)),
2757                metadata: metadata.clone(),
2758            },
2759            HydroNode::Chain {
2760                first,
2761                second,
2762                metadata,
2763            } => HydroNode::Chain {
2764                first: Box::new(first.deep_clone(seen_tees)),
2765                second: Box::new(second.deep_clone(seen_tees)),
2766                metadata: metadata.clone(),
2767            },
2768            HydroNode::MergeOrdered {
2769                first,
2770                second,
2771                metadata,
2772            } => HydroNode::MergeOrdered {
2773                first: Box::new(first.deep_clone(seen_tees)),
2774                second: Box::new(second.deep_clone(seen_tees)),
2775                metadata: metadata.clone(),
2776            },
2777            HydroNode::ChainFirst {
2778                first,
2779                second,
2780                metadata,
2781            } => HydroNode::ChainFirst {
2782                first: Box::new(first.deep_clone(seen_tees)),
2783                second: Box::new(second.deep_clone(seen_tees)),
2784                metadata: metadata.clone(),
2785            },
2786            HydroNode::CrossProduct {
2787                left,
2788                right,
2789                metadata,
2790            } => HydroNode::CrossProduct {
2791                left: Box::new(left.deep_clone(seen_tees)),
2792                right: Box::new(right.deep_clone(seen_tees)),
2793                metadata: metadata.clone(),
2794            },
2795            HydroNode::CrossSingleton {
2796                left,
2797                right,
2798                metadata,
2799            } => HydroNode::CrossSingleton {
2800                left: Box::new(left.deep_clone(seen_tees)),
2801                right: Box::new(right.deep_clone(seen_tees)),
2802                metadata: metadata.clone(),
2803            },
2804            HydroNode::Join {
2805                left,
2806                right,
2807                metadata,
2808            } => HydroNode::Join {
2809                left: Box::new(left.deep_clone(seen_tees)),
2810                right: Box::new(right.deep_clone(seen_tees)),
2811                metadata: metadata.clone(),
2812            },
2813            HydroNode::JoinHalf {
2814                left,
2815                right,
2816                metadata,
2817            } => HydroNode::JoinHalf {
2818                left: Box::new(left.deep_clone(seen_tees)),
2819                right: Box::new(right.deep_clone(seen_tees)),
2820                metadata: metadata.clone(),
2821            },
2822            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2823                pos: Box::new(pos.deep_clone(seen_tees)),
2824                neg: Box::new(neg.deep_clone(seen_tees)),
2825                metadata: metadata.clone(),
2826            },
2827            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2828                pos: Box::new(pos.deep_clone(seen_tees)),
2829                neg: Box::new(neg.deep_clone(seen_tees)),
2830                metadata: metadata.clone(),
2831            },
2832            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2833                input: Box::new(input.deep_clone(seen_tees)),
2834                metadata: metadata.clone(),
2835            },
2836            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2837                HydroNode::ResolveFuturesBlocking {
2838                    input: Box::new(input.deep_clone(seen_tees)),
2839                    metadata: metadata.clone(),
2840                }
2841            }
2842            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2843                HydroNode::ResolveFuturesOrdered {
2844                    input: Box::new(input.deep_clone(seen_tees)),
2845                    metadata: metadata.clone(),
2846                }
2847            }
2848            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2849                f: f.deep_clone(seen_tees),
2850                input: Box::new(input.deep_clone(seen_tees)),
2851                metadata: metadata.clone(),
2852            },
2853            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2854                f: f.deep_clone(seen_tees),
2855                input: Box::new(input.deep_clone(seen_tees)),
2856                metadata: metadata.clone(),
2857            },
2858            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2859                HydroNode::FlatMapStreamBlocking {
2860                    f: f.deep_clone(seen_tees),
2861                    input: Box::new(input.deep_clone(seen_tees)),
2862                    metadata: metadata.clone(),
2863                }
2864            }
2865            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2866                f: f.deep_clone(seen_tees),
2867                input: Box::new(input.deep_clone(seen_tees)),
2868                metadata: metadata.clone(),
2869            },
2870            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2871                f: f.deep_clone(seen_tees),
2872                input: Box::new(input.deep_clone(seen_tees)),
2873                metadata: metadata.clone(),
2874            },
2875            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2876                input: Box::new(input.deep_clone(seen_tees)),
2877                metadata: metadata.clone(),
2878            },
2879            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2880                input: Box::new(input.deep_clone(seen_tees)),
2881                metadata: metadata.clone(),
2882            },
2883            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2884                f: f.deep_clone(seen_tees),
2885                input: Box::new(input.deep_clone(seen_tees)),
2886                metadata: metadata.clone(),
2887            },
2888            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2889                input: Box::new(input.deep_clone(seen_tees)),
2890                metadata: metadata.clone(),
2891            },
2892            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2893                input: Box::new(input.deep_clone(seen_tees)),
2894                metadata: metadata.clone(),
2895            },
2896            HydroNode::Fold {
2897                init,
2898                acc,
2899                input,
2900                metadata,
2901            } => HydroNode::Fold {
2902                init: init.deep_clone(seen_tees),
2903                acc: acc.deep_clone(seen_tees),
2904                input: Box::new(input.deep_clone(seen_tees)),
2905                metadata: metadata.clone(),
2906            },
2907            HydroNode::Scan {
2908                init,
2909                acc,
2910                input,
2911                metadata,
2912            } => HydroNode::Scan {
2913                init: init.deep_clone(seen_tees),
2914                acc: acc.deep_clone(seen_tees),
2915                input: Box::new(input.deep_clone(seen_tees)),
2916                metadata: metadata.clone(),
2917            },
2918            HydroNode::ScanAsyncBlocking {
2919                init,
2920                acc,
2921                input,
2922                metadata,
2923            } => HydroNode::ScanAsyncBlocking {
2924                init: init.deep_clone(seen_tees),
2925                acc: acc.deep_clone(seen_tees),
2926                input: Box::new(input.deep_clone(seen_tees)),
2927                metadata: metadata.clone(),
2928            },
2929            HydroNode::FoldKeyed {
2930                init,
2931                acc,
2932                input,
2933                metadata,
2934            } => HydroNode::FoldKeyed {
2935                init: init.deep_clone(seen_tees),
2936                acc: acc.deep_clone(seen_tees),
2937                input: Box::new(input.deep_clone(seen_tees)),
2938                metadata: metadata.clone(),
2939            },
2940            HydroNode::ReduceKeyedWatermark {
2941                f,
2942                input,
2943                watermark,
2944                metadata,
2945            } => HydroNode::ReduceKeyedWatermark {
2946                f: f.deep_clone(seen_tees),
2947                input: Box::new(input.deep_clone(seen_tees)),
2948                watermark: Box::new(watermark.deep_clone(seen_tees)),
2949                metadata: metadata.clone(),
2950            },
2951            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2952                f: f.deep_clone(seen_tees),
2953                input: Box::new(input.deep_clone(seen_tees)),
2954                metadata: metadata.clone(),
2955            },
2956            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2957                f: f.deep_clone(seen_tees),
2958                input: Box::new(input.deep_clone(seen_tees)),
2959                metadata: metadata.clone(),
2960            },
2961            HydroNode::Network {
2962                name,
2963                networking_info,
2964                serialize_fn,
2965                instantiate_fn,
2966                deserialize_fn,
2967                input,
2968                metadata,
2969            } => HydroNode::Network {
2970                name: name.clone(),
2971                networking_info: networking_info.clone(),
2972                serialize_fn: serialize_fn.clone(),
2973                instantiate_fn: instantiate_fn.clone(),
2974                deserialize_fn: deserialize_fn.clone(),
2975                input: Box::new(input.deep_clone(seen_tees)),
2976                metadata: metadata.clone(),
2977            },
2978            HydroNode::ExternalInput {
2979                from_external_key,
2980                from_port_id,
2981                from_many,
2982                codec_type,
2983                port_hint,
2984                instantiate_fn,
2985                deserialize_fn,
2986                metadata,
2987            } => HydroNode::ExternalInput {
2988                from_external_key: *from_external_key,
2989                from_port_id: *from_port_id,
2990                from_many: *from_many,
2991                codec_type: codec_type.clone(),
2992                port_hint: *port_hint,
2993                instantiate_fn: instantiate_fn.clone(),
2994                deserialize_fn: deserialize_fn.clone(),
2995                metadata: metadata.clone(),
2996            },
2997            HydroNode::Counter {
2998                tag,
2999                duration,
3000                prefix,
3001                input,
3002                metadata,
3003            } => HydroNode::Counter {
3004                tag: tag.clone(),
3005                duration: duration.clone(),
3006                prefix: prefix.clone(),
3007                input: Box::new(input.deep_clone(seen_tees)),
3008                metadata: metadata.clone(),
3009            },
3010        }
3011    }
3012
3013    #[cfg(feature = "build")]
3014    pub fn emit_core(
3015        &mut self,
3016        builders_or_callback: &mut BuildersOrCallback<
3017            impl FnMut(&mut HydroRoot, &mut usize),
3018            impl FnMut(&mut HydroNode, &mut usize),
3019        >,
3020        seen_tees: &mut SeenSharedNodes,
3021        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3022        next_stmt_id: &mut usize,
3023        fold_hooked_idents: &mut HashSet<String>,
3024    ) -> syn::Ident {
3025        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3026
3027        self.transform_bottom_up(
3028            &mut |node: &mut HydroNode| {
3029                let out_location = node.metadata().location_id.clone();
3030                match node {
3031                    HydroNode::Placeholder => {
3032                        panic!()
3033                    }
3034
3035                    HydroNode::Cast { .. } => {
3036                        // Cast passes through the input ident unchanged
3037                        // The input ident is already on the stack from processing the child
3038                        match builders_or_callback {
3039                            BuildersOrCallback::Builders(_) => {}
3040                            BuildersOrCallback::Callback(_, node_callback) => {
3041                                node_callback(node, next_stmt_id);
3042                            }
3043                        }
3044
3045                        *next_stmt_id += 1;
3046                        // input_ident stays on stack as output
3047                    }
3048
3049                    HydroNode::ObserveNonDet {
3050                        inner,
3051                        trusted,
3052                        metadata,
3053                        ..
3054                    } => {
3055                        let inner_ident = ident_stack.pop().unwrap();
3056
3057                        let observe_ident =
3058                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3059
3060                        match builders_or_callback {
3061                            BuildersOrCallback::Builders(graph_builders) => {
3062                                graph_builders.observe_nondet(
3063                                    *trusted,
3064                                    &inner.metadata().location_id,
3065                                    inner_ident,
3066                                    &inner.metadata().collection_kind,
3067                                    &observe_ident,
3068                                    &metadata.collection_kind,
3069                                    &metadata.op,
3070                                );
3071                            }
3072                            BuildersOrCallback::Callback(_, node_callback) => {
3073                                node_callback(node, next_stmt_id);
3074                            }
3075                        }
3076
3077                        *next_stmt_id += 1;
3078
3079                        ident_stack.push(observe_ident);
3080                    }
3081
3082                    HydroNode::Batch {
3083                        inner, metadata, ..
3084                    } => {
3085                        let inner_ident = ident_stack.pop().unwrap();
3086
3087                        let batch_ident =
3088                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3089
3090                        match builders_or_callback {
3091                            BuildersOrCallback::Builders(graph_builders) => {
3092                                graph_builders.batch(
3093                                    inner_ident,
3094                                    &inner.metadata().location_id,
3095                                    &inner.metadata().collection_kind,
3096                                    &batch_ident,
3097                                    &out_location,
3098                                    &metadata.op,
3099                                    fold_hooked_idents,
3100                                );
3101                            }
3102                            BuildersOrCallback::Callback(_, node_callback) => {
3103                                node_callback(node, next_stmt_id);
3104                            }
3105                        }
3106
3107                        *next_stmt_id += 1;
3108
3109                        ident_stack.push(batch_ident);
3110                    }
3111
3112                    HydroNode::YieldConcat { inner, .. } => {
3113                        let inner_ident = ident_stack.pop().unwrap();
3114
3115                        let yield_ident =
3116                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3117
3118                        match builders_or_callback {
3119                            BuildersOrCallback::Builders(graph_builders) => {
3120                                graph_builders.yield_from_tick(
3121                                    inner_ident,
3122                                    &inner.metadata().location_id,
3123                                    &inner.metadata().collection_kind,
3124                                    &yield_ident,
3125                                    &out_location,
3126                                );
3127                            }
3128                            BuildersOrCallback::Callback(_, node_callback) => {
3129                                node_callback(node, next_stmt_id);
3130                            }
3131                        }
3132
3133                        *next_stmt_id += 1;
3134
3135                        ident_stack.push(yield_ident);
3136                    }
3137
3138                    HydroNode::BeginAtomic { inner, metadata } => {
3139                        let inner_ident = ident_stack.pop().unwrap();
3140
3141                        let begin_ident =
3142                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3143
3144                        match builders_or_callback {
3145                            BuildersOrCallback::Builders(graph_builders) => {
3146                                graph_builders.begin_atomic(
3147                                    inner_ident,
3148                                    &inner.metadata().location_id,
3149                                    &inner.metadata().collection_kind,
3150                                    &begin_ident,
3151                                    &out_location,
3152                                    &metadata.op,
3153                                );
3154                            }
3155                            BuildersOrCallback::Callback(_, node_callback) => {
3156                                node_callback(node, next_stmt_id);
3157                            }
3158                        }
3159
3160                        *next_stmt_id += 1;
3161
3162                        ident_stack.push(begin_ident);
3163                    }
3164
3165                    HydroNode::EndAtomic { inner, .. } => {
3166                        let inner_ident = ident_stack.pop().unwrap();
3167
3168                        let end_ident =
3169                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3170
3171                        match builders_or_callback {
3172                            BuildersOrCallback::Builders(graph_builders) => {
3173                                graph_builders.end_atomic(
3174                                    inner_ident,
3175                                    &inner.metadata().location_id,
3176                                    &inner.metadata().collection_kind,
3177                                    &end_ident,
3178                                );
3179                            }
3180                            BuildersOrCallback::Callback(_, node_callback) => {
3181                                node_callback(node, next_stmt_id);
3182                            }
3183                        }
3184
3185                        *next_stmt_id += 1;
3186
3187                        ident_stack.push(end_ident);
3188                    }
3189
3190                    HydroNode::Source {
3191                        source, metadata, ..
3192                    } => {
3193                        if let HydroSource::ExternalNetwork() = source {
3194                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3195                        } else {
3196                            let source_ident =
3197                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3198
3199                            let source_stmt = match source {
3200                                HydroSource::Stream(expr) => {
3201                                    debug_assert!(metadata.location_id.is_top_level());
3202                                    parse_quote! {
3203                                        #source_ident = source_stream(#expr);
3204                                    }
3205                                }
3206
3207                                HydroSource::ExternalNetwork() => {
3208                                    unreachable!()
3209                                }
3210
3211                                HydroSource::Iter(expr) => {
3212                                    if metadata.location_id.is_top_level() {
3213                                        parse_quote! {
3214                                            #source_ident = source_iter(#expr);
3215                                        }
3216                                    } else {
3217                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3218                                        parse_quote! {
3219                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3220                                        }
3221                                    }
3222                                }
3223
3224                                HydroSource::Spin() => {
3225                                    debug_assert!(metadata.location_id.is_top_level());
3226                                    parse_quote! {
3227                                        #source_ident = spin();
3228                                    }
3229                                }
3230
3231                                HydroSource::ClusterMembers(target_loc, state) => {
3232                                    debug_assert!(metadata.location_id.is_top_level());
3233
3234                                    let members_tee_ident = syn::Ident::new(
3235                                        &format!(
3236                                            "__cluster_members_tee_{}_{}",
3237                                            metadata.location_id.root().key(),
3238                                            target_loc.key(),
3239                                        ),
3240                                        Span::call_site(),
3241                                    );
3242
3243                                    match state {
3244                                        ClusterMembersState::Stream(d) => {
3245                                            parse_quote! {
3246                                                #members_tee_ident = source_stream(#d) -> tee();
3247                                                #source_ident = #members_tee_ident;
3248                                            }
3249                                        },
3250                                        ClusterMembersState::Uninit => syn::parse_quote! {
3251                                            #source_ident = source_stream(DUMMY);
3252                                        },
3253                                        ClusterMembersState::Tee(..) => parse_quote! {
3254                                            #source_ident = #members_tee_ident;
3255                                        },
3256                                    }
3257                                }
3258
3259                                HydroSource::Embedded(ident) => {
3260                                    parse_quote! {
3261                                        #source_ident = source_stream(#ident);
3262                                    }
3263                                }
3264
3265                                HydroSource::EmbeddedSingleton(ident) => {
3266                                    parse_quote! {
3267                                        #source_ident = source_iter([#ident]);
3268                                    }
3269                                }
3270                            };
3271
3272                            match builders_or_callback {
3273                                BuildersOrCallback::Builders(graph_builders) => {
3274                                    let builder = graph_builders.get_dfir_mut(&out_location);
3275                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3276                                }
3277                                BuildersOrCallback::Callback(_, node_callback) => {
3278                                    node_callback(node, next_stmt_id);
3279                                }
3280                            }
3281
3282                            *next_stmt_id += 1;
3283
3284                            ident_stack.push(source_ident);
3285                        }
3286                    }
3287
3288                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3289                        let source_ident =
3290                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3291
3292                        match builders_or_callback {
3293                            BuildersOrCallback::Builders(graph_builders) => {
3294                                let builder = graph_builders.get_dfir_mut(&out_location);
3295
3296                                if *first_tick_only {
3297                                    assert!(
3298                                        !metadata.location_id.is_top_level(),
3299                                        "first_tick_only SingletonSource must be inside a tick"
3300                                    );
3301                                }
3302
3303                                if *first_tick_only
3304                                    || (metadata.location_id.is_top_level()
3305                                        && metadata.collection_kind.is_bounded())
3306                                {
3307                                    builder.add_dfir(
3308                                        parse_quote! {
3309                                            #source_ident = source_iter([#value]);
3310                                        },
3311                                        None,
3312                                        Some(&next_stmt_id.to_string()),
3313                                    );
3314                                } else {
3315                                    builder.add_dfir(
3316                                        parse_quote! {
3317                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3318                                        },
3319                                        None,
3320                                        Some(&next_stmt_id.to_string()),
3321                                    );
3322                                }
3323                            }
3324                            BuildersOrCallback::Callback(_, node_callback) => {
3325                                node_callback(node, next_stmt_id);
3326                            }
3327                        }
3328
3329                        *next_stmt_id += 1;
3330
3331                        ident_stack.push(source_ident);
3332                    }
3333
3334                    HydroNode::CycleSource { cycle_id, .. } => {
3335                        let ident = cycle_id.as_ident();
3336
3337                        match builders_or_callback {
3338                            BuildersOrCallback::Builders(_) => {}
3339                            BuildersOrCallback::Callback(_, node_callback) => {
3340                                node_callback(node, next_stmt_id);
3341                            }
3342                        }
3343
3344                        // consume a stmt id even though we did not emit anything so that we can instrument this
3345                        *next_stmt_id += 1;
3346
3347                        ident_stack.push(ident);
3348                    }
3349
3350                    HydroNode::Tee { inner, .. } => {
3351                        let ret_ident = if let Some(built_idents) =
3352                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3353                        {
3354                            match builders_or_callback {
3355                                BuildersOrCallback::Builders(_) => {}
3356                                BuildersOrCallback::Callback(_, node_callback) => {
3357                                    node_callback(node, next_stmt_id);
3358                                }
3359                            }
3360
3361                            built_idents[0].clone()
3362                        } else {
3363                            // The inner node was already processed by transform_bottom_up,
3364                            // so its ident is on the stack
3365                            let inner_ident = ident_stack.pop().unwrap();
3366
3367                            let tee_ident =
3368                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3369
3370                            built_tees.insert(
3371                                inner.0.as_ref() as *const RefCell<HydroNode>,
3372                                vec![tee_ident.clone()],
3373                            );
3374
3375                            match builders_or_callback {
3376                                BuildersOrCallback::Builders(graph_builders) => {
3377                                    // NOTE: With `forward_ref`, the fold codegen may not have
3378                                    // run yet when we reach this tee, so `fold_hooked_idents`
3379                                    // might not contain the inner ident. In that case we won't
3380                                    // propagate the "hooked" status to the tee and the
3381                                    // downstream singleton batch will use the normal
3382                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3383                                    // This is not a soundness issue: the fallback hook still
3384                                    // produces correct behavior, just with a redundant decision
3385                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3386                                    // fix ordering so forward_ref folds are always processed
3387                                    // before their downstream tees.
3388                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3389                                        fold_hooked_idents.insert(tee_ident.to_string());
3390                                    }
3391                                    let builder = graph_builders.get_dfir_mut(&out_location);
3392                                    builder.add_dfir(
3393                                        parse_quote! {
3394                                            #tee_ident = #inner_ident -> tee();
3395                                        },
3396                                        None,
3397                                        Some(&next_stmt_id.to_string()),
3398                                    );
3399                                }
3400                                BuildersOrCallback::Callback(_, node_callback) => {
3401                                    node_callback(node, next_stmt_id);
3402                                }
3403                            }
3404
3405                            tee_ident
3406                        };
3407
3408                        // we consume a stmt id regardless of if we emit the tee() operator,
3409                        // so that during rewrites we touch all recipients of the tee()
3410
3411                        *next_stmt_id += 1;
3412                        ident_stack.push(ret_ident);
3413                    }
3414
3415                    HydroNode::Singleton { inner, .. } => {
3416                        let ret_ident = if let Some(built_idents) =
3417                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3418                        {
3419                            built_idents[0].clone()
3420                        } else {
3421                            let inner_ident = ident_stack.pop().unwrap();
3422
3423                            let singleton_ident =
3424                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3425
3426                            built_tees.insert(
3427                                inner.0.as_ref() as *const RefCell<HydroNode>,
3428                                vec![singleton_ident.clone()],
3429                            );
3430
3431                            match builders_or_callback {
3432                                BuildersOrCallback::Builders(graph_builders) => {
3433                                    let builder = graph_builders.get_dfir_mut(&out_location);
3434                                    builder.add_dfir(
3435                                        parse_quote! {
3436                                            #singleton_ident = #inner_ident -> singleton();
3437                                        },
3438                                        None,
3439                                        Some(&next_stmt_id.to_string()),
3440                                    );
3441                                }
3442                                BuildersOrCallback::Callback(_, node_callback) => {
3443                                    node_callback(node, next_stmt_id);
3444                                }
3445                            }
3446
3447                            singleton_ident
3448                        };
3449
3450                        // we consume a stmt id regardless of if we emit the singleton() operator,
3451                        // so that during rewrites we touch all recipients of the singleton()
3452                        *next_stmt_id += 1;
3453                        ident_stack.push(ret_ident);
3454                    }
3455
3456                    HydroNode::Partition {
3457                        inner, f, is_true, ..
3458                    } => {
3459                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3460                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3461                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3462                            match builders_or_callback {
3463                                BuildersOrCallback::Builders(_) => {}
3464                                BuildersOrCallback::Callback(_, node_callback) => {
3465                                    node_callback(node, next_stmt_id);
3466                                }
3467                            }
3468
3469                            let idx = if is_true { 0 } else { 1 };
3470                            built_idents[idx].clone()
3471                        } else {
3472                            // The inner node was already processed by transform_bottom_up,
3473                            // so its ident is on the stack
3474                            let inner_ident = ident_stack.pop().unwrap();
3475                            let f_tokens = f.emit_tokens(&mut ident_stack);
3476
3477                            let partition_ident = syn::Ident::new(
3478                                &format!("stream_{}_partition", *next_stmt_id),
3479                                Span::call_site(),
3480                            );
3481                            let true_ident = syn::Ident::new(
3482                                &format!("stream_{}_true", *next_stmt_id),
3483                                Span::call_site(),
3484                            );
3485                            let false_ident = syn::Ident::new(
3486                                &format!("stream_{}_false", *next_stmt_id),
3487                                Span::call_site(),
3488                            );
3489
3490                            built_tees.insert(
3491                                ptr,
3492                                vec![true_ident.clone(), false_ident.clone()],
3493                            );
3494
3495                            match builders_or_callback {
3496                                BuildersOrCallback::Builders(graph_builders) => {
3497                                    let builder = graph_builders.get_dfir_mut(&out_location);
3498                                    builder.add_dfir(
3499                                        parse_quote! {
3500                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3501                                            #true_ident = #partition_ident[0];
3502                                            #false_ident = #partition_ident[1];
3503                                        },
3504                                        None,
3505                                        Some(&next_stmt_id.to_string()),
3506                                    );
3507                                }
3508                                BuildersOrCallback::Callback(_, node_callback) => {
3509                                    node_callback(node, next_stmt_id);
3510                                }
3511                            }
3512
3513                            if is_true { true_ident } else { false_ident }
3514                        };
3515
3516                        *next_stmt_id += 1;
3517                        ident_stack.push(ret_ident);
3518                    }
3519
3520                    HydroNode::Chain { .. } => {
3521                        // Children are processed left-to-right, so second is on top
3522                        let second_ident = ident_stack.pop().unwrap();
3523                        let first_ident = ident_stack.pop().unwrap();
3524
3525                        let chain_ident =
3526                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3527
3528                        match builders_or_callback {
3529                            BuildersOrCallback::Builders(graph_builders) => {
3530                                let builder = graph_builders.get_dfir_mut(&out_location);
3531                                builder.add_dfir(
3532                                    parse_quote! {
3533                                        #chain_ident = chain();
3534                                        #first_ident -> [0]#chain_ident;
3535                                        #second_ident -> [1]#chain_ident;
3536                                    },
3537                                    None,
3538                                    Some(&next_stmt_id.to_string()),
3539                                );
3540                            }
3541                            BuildersOrCallback::Callback(_, node_callback) => {
3542                                node_callback(node, next_stmt_id);
3543                            }
3544                        }
3545
3546                        *next_stmt_id += 1;
3547
3548                        ident_stack.push(chain_ident);
3549                    }
3550
3551                    HydroNode::MergeOrdered { first, metadata, .. } => {
3552                        let second_ident = ident_stack.pop().unwrap();
3553                        let first_ident = ident_stack.pop().unwrap();
3554
3555                        let merge_ident =
3556                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3557
3558                        match builders_or_callback {
3559                            BuildersOrCallback::Builders(graph_builders) => {
3560                                graph_builders.merge_ordered(
3561                                    &first.metadata().location_id,
3562                                    first_ident,
3563                                    second_ident,
3564                                    &merge_ident,
3565                                    &first.metadata().collection_kind,
3566                                    &metadata.op,
3567                                    Some(&next_stmt_id.to_string()),
3568                                );
3569                            }
3570                            BuildersOrCallback::Callback(_, node_callback) => {
3571                                node_callback(node, next_stmt_id);
3572                            }
3573                        }
3574
3575                        *next_stmt_id += 1;
3576
3577                        ident_stack.push(merge_ident);
3578                    }
3579
3580                    HydroNode::ChainFirst { .. } => {
3581                        let second_ident = ident_stack.pop().unwrap();
3582                        let first_ident = ident_stack.pop().unwrap();
3583
3584                        let chain_ident =
3585                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3586
3587                        match builders_or_callback {
3588                            BuildersOrCallback::Builders(graph_builders) => {
3589                                let builder = graph_builders.get_dfir_mut(&out_location);
3590                                builder.add_dfir(
3591                                    parse_quote! {
3592                                        #chain_ident = chain_first_n(1);
3593                                        #first_ident -> [0]#chain_ident;
3594                                        #second_ident -> [1]#chain_ident;
3595                                    },
3596                                    None,
3597                                    Some(&next_stmt_id.to_string()),
3598                                );
3599                            }
3600                            BuildersOrCallback::Callback(_, node_callback) => {
3601                                node_callback(node, next_stmt_id);
3602                            }
3603                        }
3604
3605                        *next_stmt_id += 1;
3606
3607                        ident_stack.push(chain_ident);
3608                    }
3609
3610                    HydroNode::CrossSingleton { right, .. } => {
3611                        let right_ident = ident_stack.pop().unwrap();
3612                        let left_ident = ident_stack.pop().unwrap();
3613
3614                        let cross_ident =
3615                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3616
3617                        match builders_or_callback {
3618                            BuildersOrCallback::Builders(graph_builders) => {
3619                                let builder = graph_builders.get_dfir_mut(&out_location);
3620
3621                                if right.metadata().location_id.is_top_level()
3622                                    && right.metadata().collection_kind.is_bounded()
3623                                {
3624                                    builder.add_dfir(
3625                                        parse_quote! {
3626                                            #cross_ident = cross_singleton();
3627                                            #left_ident -> [input]#cross_ident;
3628                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3629                                        },
3630                                        None,
3631                                        Some(&next_stmt_id.to_string()),
3632                                    );
3633                                } else {
3634                                    builder.add_dfir(
3635                                        parse_quote! {
3636                                            #cross_ident = cross_singleton();
3637                                            #left_ident -> [input]#cross_ident;
3638                                            #right_ident -> [single]#cross_ident;
3639                                        },
3640                                        None,
3641                                        Some(&next_stmt_id.to_string()),
3642                                    );
3643                                }
3644                            }
3645                            BuildersOrCallback::Callback(_, node_callback) => {
3646                                node_callback(node, next_stmt_id);
3647                            }
3648                        }
3649
3650                        *next_stmt_id += 1;
3651
3652                        ident_stack.push(cross_ident);
3653                    }
3654
3655                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3656                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3657                            parse_quote!(cross_join_multiset)
3658                        } else {
3659                            parse_quote!(join_multiset)
3660                        };
3661
3662                        let (HydroNode::CrossProduct { left, right, .. }
3663                        | HydroNode::Join { left, right, .. }) = node
3664                        else {
3665                            unreachable!()
3666                        };
3667
3668                        let is_top_level = left.metadata().location_id.is_top_level()
3669                            && right.metadata().location_id.is_top_level();
3670                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3671                            quote!('static)
3672                        } else {
3673                            quote!('tick)
3674                        };
3675
3676                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3677                            quote!('static)
3678                        } else {
3679                            quote!('tick)
3680                        };
3681
3682                        let right_ident = ident_stack.pop().unwrap();
3683                        let left_ident = ident_stack.pop().unwrap();
3684
3685                        let stream_ident =
3686                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3687
3688                        match builders_or_callback {
3689                            BuildersOrCallback::Builders(graph_builders) => {
3690                                let builder = graph_builders.get_dfir_mut(&out_location);
3691                                builder.add_dfir(
3692                                    if is_top_level {
3693                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3694                                        // a multiset_delta() to negate the replay behavior
3695                                        parse_quote! {
3696                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3697                                            #left_ident -> [0]#stream_ident;
3698                                            #right_ident -> [1]#stream_ident;
3699                                        }
3700                                    } else {
3701                                        parse_quote! {
3702                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3703                                            #left_ident -> [0]#stream_ident;
3704                                            #right_ident -> [1]#stream_ident;
3705                                        }
3706                                    }
3707                                    ,
3708                                    None,
3709                                    Some(&next_stmt_id.to_string()),
3710                                );
3711                            }
3712                            BuildersOrCallback::Callback(_, node_callback) => {
3713                                node_callback(node, next_stmt_id);
3714                            }
3715                        }
3716
3717                        *next_stmt_id += 1;
3718
3719                        ident_stack.push(stream_ident);
3720                    }
3721
3722                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3723                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3724                            parse_quote!(difference)
3725                        } else {
3726                            parse_quote!(anti_join)
3727                        };
3728
3729                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3730                            node
3731                        else {
3732                            unreachable!()
3733                        };
3734
3735                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3736                            quote!('static)
3737                        } else {
3738                            quote!('tick)
3739                        };
3740
3741                        let neg_ident = ident_stack.pop().unwrap();
3742                        let pos_ident = ident_stack.pop().unwrap();
3743
3744                        let stream_ident =
3745                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3746
3747                        match builders_or_callback {
3748                            BuildersOrCallback::Builders(graph_builders) => {
3749                                let builder = graph_builders.get_dfir_mut(&out_location);
3750                                builder.add_dfir(
3751                                    parse_quote! {
3752                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3753                                        #pos_ident -> [pos]#stream_ident;
3754                                        #neg_ident -> [neg]#stream_ident;
3755                                    },
3756                                    None,
3757                                    Some(&next_stmt_id.to_string()),
3758                                );
3759                            }
3760                            BuildersOrCallback::Callback(_, node_callback) => {
3761                                node_callback(node, next_stmt_id);
3762                            }
3763                        }
3764
3765                        *next_stmt_id += 1;
3766
3767                        ident_stack.push(stream_ident);
3768                    }
3769
3770                    HydroNode::JoinHalf { .. } => {
3771                        let HydroNode::JoinHalf { right, .. } = node else {
3772                            unreachable!()
3773                        };
3774
3775                        assert!(
3776                            right.metadata().collection_kind.is_bounded(),
3777                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3778                            right.metadata().collection_kind
3779                        );
3780
3781                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3782                            quote!('static)
3783                        } else {
3784                            quote!('tick)
3785                        };
3786
3787                        let build_ident = ident_stack.pop().unwrap();
3788                        let probe_ident = ident_stack.pop().unwrap();
3789
3790                        let stream_ident =
3791                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3792
3793                        match builders_or_callback {
3794                            BuildersOrCallback::Builders(graph_builders) => {
3795                                let builder = graph_builders.get_dfir_mut(&out_location);
3796                                builder.add_dfir(
3797                                    parse_quote! {
3798                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3799                                        #probe_ident -> [probe]#stream_ident;
3800                                        #build_ident -> [build]#stream_ident;
3801                                    },
3802                                    None,
3803                                    Some(&next_stmt_id.to_string()),
3804                                );
3805                            }
3806                            BuildersOrCallback::Callback(_, node_callback) => {
3807                                node_callback(node, next_stmt_id);
3808                            }
3809                        }
3810
3811                        *next_stmt_id += 1;
3812
3813                        ident_stack.push(stream_ident);
3814                    }
3815
3816                    HydroNode::ResolveFutures { .. } => {
3817                        let input_ident = ident_stack.pop().unwrap();
3818
3819                        let futures_ident =
3820                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3821
3822                        match builders_or_callback {
3823                            BuildersOrCallback::Builders(graph_builders) => {
3824                                let builder = graph_builders.get_dfir_mut(&out_location);
3825                                builder.add_dfir(
3826                                    parse_quote! {
3827                                        #futures_ident = #input_ident -> resolve_futures();
3828                                    },
3829                                    None,
3830                                    Some(&next_stmt_id.to_string()),
3831                                );
3832                            }
3833                            BuildersOrCallback::Callback(_, node_callback) => {
3834                                node_callback(node, next_stmt_id);
3835                            }
3836                        }
3837
3838                        *next_stmt_id += 1;
3839
3840                        ident_stack.push(futures_ident);
3841                    }
3842
3843                    HydroNode::ResolveFuturesBlocking { .. } => {
3844                        let input_ident = ident_stack.pop().unwrap();
3845
3846                        let futures_ident =
3847                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3848
3849                        match builders_or_callback {
3850                            BuildersOrCallback::Builders(graph_builders) => {
3851                                let builder = graph_builders.get_dfir_mut(&out_location);
3852                                builder.add_dfir(
3853                                    parse_quote! {
3854                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3855                                    },
3856                                    None,
3857                                    Some(&next_stmt_id.to_string()),
3858                                );
3859                            }
3860                            BuildersOrCallback::Callback(_, node_callback) => {
3861                                node_callback(node, next_stmt_id);
3862                            }
3863                        }
3864
3865                        *next_stmt_id += 1;
3866
3867                        ident_stack.push(futures_ident);
3868                    }
3869
3870                    HydroNode::ResolveFuturesOrdered { .. } => {
3871                        let input_ident = ident_stack.pop().unwrap();
3872
3873                        let futures_ident =
3874                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3875
3876                        match builders_or_callback {
3877                            BuildersOrCallback::Builders(graph_builders) => {
3878                                let builder = graph_builders.get_dfir_mut(&out_location);
3879                                builder.add_dfir(
3880                                    parse_quote! {
3881                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3882                                    },
3883                                    None,
3884                                    Some(&next_stmt_id.to_string()),
3885                                );
3886                            }
3887                            BuildersOrCallback::Callback(_, node_callback) => {
3888                                node_callback(node, next_stmt_id);
3889                            }
3890                        }
3891
3892                        *next_stmt_id += 1;
3893
3894                        ident_stack.push(futures_ident);
3895                    }
3896
3897                    HydroNode::Map { f, .. } => {
3898                        // Pop input ident (pushed last by transform_children).
3899                        let input_ident = ident_stack.pop().unwrap();
3900                        let f_tokens = f.emit_tokens(&mut ident_stack);
3901
3902                        let map_ident =
3903                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3904
3905                        match builders_or_callback {
3906                            BuildersOrCallback::Builders(graph_builders) => {
3907                                let builder = graph_builders.get_dfir_mut(&out_location);
3908                                builder.add_dfir(
3909                                    parse_quote! {
3910                                        #map_ident = #input_ident -> map(#f_tokens);
3911                                    },
3912                                    None,
3913                                    Some(&next_stmt_id.to_string()),
3914                                );
3915                            }
3916                            BuildersOrCallback::Callback(_, node_callback) => {
3917                                node_callback(node, next_stmt_id);
3918                            }
3919                        }
3920
3921                        *next_stmt_id += 1;
3922
3923                        ident_stack.push(map_ident);
3924                    }
3925
3926                    HydroNode::FlatMap { f, .. } => {
3927                        let input_ident = ident_stack.pop().unwrap();
3928                        let f_tokens = f.emit_tokens(&mut ident_stack);
3929
3930                        let flat_map_ident =
3931                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3932
3933                        match builders_or_callback {
3934                            BuildersOrCallback::Builders(graph_builders) => {
3935                                let builder = graph_builders.get_dfir_mut(&out_location);
3936                                builder.add_dfir(
3937                                    parse_quote! {
3938                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
3939                                    },
3940                                    None,
3941                                    Some(&next_stmt_id.to_string()),
3942                                );
3943                            }
3944                            BuildersOrCallback::Callback(_, node_callback) => {
3945                                node_callback(node, next_stmt_id);
3946                            }
3947                        }
3948
3949                        *next_stmt_id += 1;
3950
3951                        ident_stack.push(flat_map_ident);
3952                    }
3953
3954                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3955                        let input_ident = ident_stack.pop().unwrap();
3956                        let f_tokens = f.emit_tokens(&mut ident_stack);
3957
3958                        let flat_map_stream_blocking_ident =
3959                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3960
3961                        match builders_or_callback {
3962                            BuildersOrCallback::Builders(graph_builders) => {
3963                                let builder = graph_builders.get_dfir_mut(&out_location);
3964                                builder.add_dfir(
3965                                    parse_quote! {
3966                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
3967                                    },
3968                                    None,
3969                                    Some(&next_stmt_id.to_string()),
3970                                );
3971                            }
3972                            BuildersOrCallback::Callback(_, node_callback) => {
3973                                node_callback(node, next_stmt_id);
3974                            }
3975                        }
3976
3977                        *next_stmt_id += 1;
3978
3979                        ident_stack.push(flat_map_stream_blocking_ident);
3980                    }
3981
3982                    HydroNode::Filter { f, .. } => {
3983                        let input_ident = ident_stack.pop().unwrap();
3984                        let f_tokens = f.emit_tokens(&mut ident_stack);
3985
3986                        let filter_ident =
3987                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3988
3989                        match builders_or_callback {
3990                            BuildersOrCallback::Builders(graph_builders) => {
3991                                let builder = graph_builders.get_dfir_mut(&out_location);
3992                                builder.add_dfir(
3993                                    parse_quote! {
3994                                        #filter_ident = #input_ident -> filter(#f_tokens);
3995                                    },
3996                                    None,
3997                                    Some(&next_stmt_id.to_string()),
3998                                );
3999                            }
4000                            BuildersOrCallback::Callback(_, node_callback) => {
4001                                node_callback(node, next_stmt_id);
4002                            }
4003                        }
4004
4005                        *next_stmt_id += 1;
4006
4007                        ident_stack.push(filter_ident);
4008                    }
4009
4010                    HydroNode::FilterMap { f, .. } => {
4011                        let input_ident = ident_stack.pop().unwrap();
4012                        let f_tokens = f.emit_tokens(&mut ident_stack);
4013
4014                        let filter_map_ident =
4015                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4016
4017                        match builders_or_callback {
4018                            BuildersOrCallback::Builders(graph_builders) => {
4019                                let builder = graph_builders.get_dfir_mut(&out_location);
4020                                builder.add_dfir(
4021                                    parse_quote! {
4022                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4023                                    },
4024                                    None,
4025                                    Some(&next_stmt_id.to_string()),
4026                                );
4027                            }
4028                            BuildersOrCallback::Callback(_, node_callback) => {
4029                                node_callback(node, next_stmt_id);
4030                            }
4031                        }
4032
4033                        *next_stmt_id += 1;
4034
4035                        ident_stack.push(filter_map_ident);
4036                    }
4037
4038                    HydroNode::Sort { .. } => {
4039                        let input_ident = ident_stack.pop().unwrap();
4040
4041                        let sort_ident =
4042                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4043
4044                        match builders_or_callback {
4045                            BuildersOrCallback::Builders(graph_builders) => {
4046                                let builder = graph_builders.get_dfir_mut(&out_location);
4047                                builder.add_dfir(
4048                                    parse_quote! {
4049                                        #sort_ident = #input_ident -> sort();
4050                                    },
4051                                    None,
4052                                    Some(&next_stmt_id.to_string()),
4053                                );
4054                            }
4055                            BuildersOrCallback::Callback(_, node_callback) => {
4056                                node_callback(node, next_stmt_id);
4057                            }
4058                        }
4059
4060                        *next_stmt_id += 1;
4061
4062                        ident_stack.push(sort_ident);
4063                    }
4064
4065                    HydroNode::DeferTick { .. } => {
4066                        let input_ident = ident_stack.pop().unwrap();
4067
4068                        let defer_tick_ident =
4069                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4070
4071                        match builders_or_callback {
4072                            BuildersOrCallback::Builders(graph_builders) => {
4073                                let builder = graph_builders.get_dfir_mut(&out_location);
4074                                builder.add_dfir(
4075                                    parse_quote! {
4076                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4077                                    },
4078                                    None,
4079                                    Some(&next_stmt_id.to_string()),
4080                                );
4081                            }
4082                            BuildersOrCallback::Callback(_, node_callback) => {
4083                                node_callback(node, next_stmt_id);
4084                            }
4085                        }
4086
4087                        *next_stmt_id += 1;
4088
4089                        ident_stack.push(defer_tick_ident);
4090                    }
4091
4092                    HydroNode::Enumerate { input, .. } => {
4093                        let input_ident = ident_stack.pop().unwrap();
4094
4095                        let enumerate_ident =
4096                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4097
4098                        match builders_or_callback {
4099                            BuildersOrCallback::Builders(graph_builders) => {
4100                                let builder = graph_builders.get_dfir_mut(&out_location);
4101                                let lifetime = if input.metadata().location_id.is_top_level() {
4102                                    quote!('static)
4103                                } else {
4104                                    quote!('tick)
4105                                };
4106                                builder.add_dfir(
4107                                    parse_quote! {
4108                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4109                                    },
4110                                    None,
4111                                    Some(&next_stmt_id.to_string()),
4112                                );
4113                            }
4114                            BuildersOrCallback::Callback(_, node_callback) => {
4115                                node_callback(node, next_stmt_id);
4116                            }
4117                        }
4118
4119                        *next_stmt_id += 1;
4120
4121                        ident_stack.push(enumerate_ident);
4122                    }
4123
4124                    HydroNode::Inspect { f, .. } => {
4125                        let input_ident = ident_stack.pop().unwrap();
4126                        let f_tokens = f.emit_tokens(&mut ident_stack);
4127
4128                        let inspect_ident =
4129                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4130
4131                        match builders_or_callback {
4132                            BuildersOrCallback::Builders(graph_builders) => {
4133                                let builder = graph_builders.get_dfir_mut(&out_location);
4134                                builder.add_dfir(
4135                                    parse_quote! {
4136                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4137                                    },
4138                                    None,
4139                                    Some(&next_stmt_id.to_string()),
4140                                );
4141                            }
4142                            BuildersOrCallback::Callback(_, node_callback) => {
4143                                node_callback(node, next_stmt_id);
4144                            }
4145                        }
4146
4147                        *next_stmt_id += 1;
4148
4149                        ident_stack.push(inspect_ident);
4150                    }
4151
4152                    HydroNode::Unique { input, .. } => {
4153                        let input_ident = ident_stack.pop().unwrap();
4154
4155                        let unique_ident =
4156                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4157
4158                        match builders_or_callback {
4159                            BuildersOrCallback::Builders(graph_builders) => {
4160                                let builder = graph_builders.get_dfir_mut(&out_location);
4161                                let lifetime = if input.metadata().location_id.is_top_level() {
4162                                    quote!('static)
4163                                } else {
4164                                    quote!('tick)
4165                                };
4166
4167                                builder.add_dfir(
4168                                    parse_quote! {
4169                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4170                                    },
4171                                    None,
4172                                    Some(&next_stmt_id.to_string()),
4173                                );
4174                            }
4175                            BuildersOrCallback::Callback(_, node_callback) => {
4176                                node_callback(node, next_stmt_id);
4177                            }
4178                        }
4179
4180                        *next_stmt_id += 1;
4181
4182                        ident_stack.push(unique_ident);
4183                    }
4184
4185                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4186                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4187                            if input.metadata().location_id.is_top_level()
4188                                && input.metadata().collection_kind.is_bounded()
4189                            {
4190                                parse_quote!(fold_no_replay)
4191                            } else {
4192                                parse_quote!(fold)
4193                            }
4194                        } else if matches!(node, HydroNode::Scan { .. }) {
4195                            parse_quote!(scan)
4196                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4197                            parse_quote!(scan_async_blocking)
4198                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4199                            if input.metadata().location_id.is_top_level()
4200                                && input.metadata().collection_kind.is_bounded()
4201                            {
4202                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4203                            } else {
4204                                parse_quote!(fold_keyed)
4205                            }
4206                        } else {
4207                            unreachable!()
4208                        };
4209
4210                        let (HydroNode::Fold { input, .. }
4211                        | HydroNode::FoldKeyed { input, .. }
4212                        | HydroNode::Scan { input, .. }
4213                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4214                        else {
4215                            unreachable!()
4216                        };
4217
4218                        let lifetime = if input.metadata().location_id.is_top_level() {
4219                            quote!('static)
4220                        } else {
4221                            quote!('tick)
4222                        };
4223
4224                        let input_ident = ident_stack.pop().unwrap();
4225
4226                        let (HydroNode::Fold { init, acc, .. }
4227                        | HydroNode::FoldKeyed { init, acc, .. }
4228                        | HydroNode::Scan { init, acc, .. }
4229                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4230                        else {
4231                            unreachable!()
4232                        };
4233
4234                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4235                        let init_tokens = init.emit_tokens(&mut ident_stack);
4236
4237                        let fold_ident =
4238                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4239
4240                        match builders_or_callback {
4241                            BuildersOrCallback::Builders(graph_builders) => {
4242                                if matches!(node, HydroNode::Fold { .. })
4243                                    && node.metadata().location_id.is_top_level()
4244                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4245                                    && graph_builders.singleton_intermediates()
4246                                    && !node.metadata().collection_kind.is_bounded()
4247                                {
4248                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4249                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4250                                        &input.metadata().location_id,
4251                                        &input_ident,
4252                                        &input.metadata().collection_kind,
4253                                        &node.metadata().op,
4254                                    );
4255
4256                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4257                                        let acc: syn::Expr = parse_quote!({
4258                                            let mut __inner = #acc_tokens;
4259                                            move |__state, __batch: Vec<_>| {
4260                                                if __batch.is_empty() {
4261                                                    return None;
4262                                                }
4263                                                for __value in __batch {
4264                                                    __inner(__state, __value);
4265                                                }
4266                                                Some(__state.clone())
4267                                            }
4268                                        });
4269                                        (hooked, acc)
4270                                    } else {
4271                                        let acc: syn::Expr = parse_quote!({
4272                                            let mut __inner = #acc_tokens;
4273                                            move |__state, __value| {
4274                                                __inner(__state, __value);
4275                                                Some(__state.clone())
4276                                            }
4277                                        });
4278                                        (&input_ident, acc)
4279                                    };
4280
4281                                    let builder = graph_builders.get_dfir_mut(&out_location);
4282                                    builder.add_dfir(
4283                                        parse_quote! {
4284                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4285                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4286                                            #fold_ident = chain();
4287                                        },
4288                                        None,
4289                                        Some(&next_stmt_id.to_string()),
4290                                    );
4291
4292                                    if hooked_input_ident.is_some() {
4293                                        fold_hooked_idents.insert(fold_ident.to_string());
4294                                    }
4295                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4296                                    && node.metadata().location_id.is_top_level()
4297                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4298                                    && graph_builders.singleton_intermediates()
4299                                    && !node.metadata().collection_kind.is_bounded()
4300                                {
4301                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4302                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4303                                        &input.metadata().location_id,
4304                                        &input_ident,
4305                                        &input.metadata().collection_kind,
4306                                        &node.metadata().op,
4307                                    );
4308                                    let builder = graph_builders.get_dfir_mut(&out_location);
4309
4310                                    let wrapped_acc: syn::Expr = parse_quote!({
4311                                        let mut __init = #init_tokens;
4312                                        let mut __inner = #acc_tokens;
4313                                        move |__state, __kv: (_, _)| {
4314                                            // TODO(shadaj): we can avoid the clone when the entry exists
4315                                            let __state = __state
4316                                                .entry(::std::clone::Clone::clone(&__kv.0))
4317                                                .or_insert_with(|| (__init)());
4318                                            __inner(__state, __kv.1);
4319                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4320                                        }
4321                                    });
4322
4323                                    if let Some(hooked_input_ident) = hooked_input_ident {
4324                                        builder.add_dfir(
4325                                            parse_quote! {
4326                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4327                                            },
4328                                            None,
4329                                            Some(&next_stmt_id.to_string()),
4330                                        );
4331
4332                                        fold_hooked_idents.insert(fold_ident.to_string());
4333                                    } else {
4334                                        builder.add_dfir(
4335                                            parse_quote! {
4336                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4337                                            },
4338                                            None,
4339                                            Some(&next_stmt_id.to_string()),
4340                                        );
4341                                    }
4342                                } else if (matches!(node, HydroNode::Fold { .. })
4343                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4344                                    && !node.metadata().location_id.is_top_level()
4345                                    && graph_builders.singleton_intermediates()
4346                                {
4347                                    let input_ref = match &*node {
4348                                        HydroNode::Fold { input, .. } => input,
4349                                        HydroNode::FoldKeyed { input, .. } => input,
4350                                        _ => unreachable!(),
4351                                    };
4352                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4353                                        &input_ref.metadata().location_id,
4354                                        &input_ident,
4355                                        &input_ref.metadata().collection_kind,
4356                                        &node.metadata().op,
4357                                    );
4358
4359                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4360                                    let builder = graph_builders.get_dfir_mut(&out_location);
4361                                    builder.add_dfir(
4362                                        parse_quote! {
4363                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4364                                        },
4365                                        None,
4366                                        Some(&next_stmt_id.to_string()),
4367                                    );
4368                                } else {
4369                                    let builder = graph_builders.get_dfir_mut(&out_location);
4370                                    builder.add_dfir(
4371                                        parse_quote! {
4372                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4373                                        },
4374                                        None,
4375                                        Some(&next_stmt_id.to_string()),
4376                                    );
4377                                }
4378                            }
4379                            BuildersOrCallback::Callback(_, node_callback) => {
4380                                node_callback(node, next_stmt_id);
4381                            }
4382                        }
4383
4384                        *next_stmt_id += 1;
4385
4386                        ident_stack.push(fold_ident);
4387                    }
4388
4389                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4390                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4391                            if input.metadata().location_id.is_top_level()
4392                                && input.metadata().collection_kind.is_bounded()
4393                            {
4394                                parse_quote!(reduce_no_replay)
4395                            } else {
4396                                parse_quote!(reduce)
4397                            }
4398                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4399                            if input.metadata().location_id.is_top_level()
4400                                && input.metadata().collection_kind.is_bounded()
4401                            {
4402                                todo!(
4403                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4404                                )
4405                            } else {
4406                                parse_quote!(reduce_keyed)
4407                            }
4408                        } else {
4409                            unreachable!()
4410                        };
4411
4412                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4413                        else {
4414                            unreachable!()
4415                        };
4416
4417                        let lifetime = if input.metadata().location_id.is_top_level() {
4418                            quote!('static)
4419                        } else {
4420                            quote!('tick)
4421                        };
4422
4423                        let input_ident = ident_stack.pop().unwrap();
4424
4425                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4426                        else {
4427                            unreachable!()
4428                        };
4429
4430                        let f_tokens = f.emit_tokens(&mut ident_stack);
4431
4432                        let reduce_ident =
4433                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4434
4435                        match builders_or_callback {
4436                            BuildersOrCallback::Builders(graph_builders) => {
4437                                if matches!(node, HydroNode::Reduce { .. })
4438                                    && node.metadata().location_id.is_top_level()
4439                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4440                                    && graph_builders.singleton_intermediates()
4441                                    && !node.metadata().collection_kind.is_bounded()
4442                                {
4443                                    todo!(
4444                                        "Reduce with optional intermediates is not yet supported in simulator"
4445                                    );
4446                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4447                                    && node.metadata().location_id.is_top_level()
4448                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4449                                    && graph_builders.singleton_intermediates()
4450                                    && !node.metadata().collection_kind.is_bounded()
4451                                {
4452                                    todo!(
4453                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4454                                    );
4455                                } else {
4456                                    let builder = graph_builders.get_dfir_mut(&out_location);
4457                                    builder.add_dfir(
4458                                        parse_quote! {
4459                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4460                                        },
4461                                        None,
4462                                        Some(&next_stmt_id.to_string()),
4463                                    );
4464                                }
4465                            }
4466                            BuildersOrCallback::Callback(_, node_callback) => {
4467                                node_callback(node, next_stmt_id);
4468                            }
4469                        }
4470
4471                        *next_stmt_id += 1;
4472
4473                        ident_stack.push(reduce_ident);
4474                    }
4475
4476                    HydroNode::ReduceKeyedWatermark {
4477                        f,
4478                        input,
4479                        metadata,
4480                        ..
4481                    } => {
4482                        let lifetime = if input.metadata().location_id.is_top_level() {
4483                            quote!('static)
4484                        } else {
4485                            quote!('tick)
4486                        };
4487
4488                        // watermark is processed second, so it's on top
4489                        let watermark_ident = ident_stack.pop().unwrap();
4490                        let input_ident = ident_stack.pop().unwrap();
4491                        let f_tokens = f.emit_tokens(&mut ident_stack);
4492
4493                        let chain_ident = syn::Ident::new(
4494                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4495                            Span::call_site(),
4496                        );
4497
4498                        let fold_ident =
4499                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4500
4501                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4502                            && input.metadata().collection_kind.is_bounded()
4503                        {
4504                            parse_quote!(fold_no_replay)
4505                        } else {
4506                            parse_quote!(fold)
4507                        };
4508
4509                        match builders_or_callback {
4510                            BuildersOrCallback::Builders(graph_builders) => {
4511                                if metadata.location_id.is_top_level()
4512                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4513                                    && graph_builders.singleton_intermediates()
4514                                    && !metadata.collection_kind.is_bounded()
4515                                {
4516                                    todo!(
4517                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4518                                    )
4519                                } else {
4520                                    let builder = graph_builders.get_dfir_mut(&out_location);
4521                                    builder.add_dfir(
4522                                        parse_quote! {
4523                                            #chain_ident = chain();
4524                                            #input_ident
4525                                                -> map(|x| (Some(x), None))
4526                                                -> [0]#chain_ident;
4527                                            #watermark_ident
4528                                                -> map(|watermark| (None, Some(watermark)))
4529                                                -> [1]#chain_ident;
4530
4531                                            #fold_ident = #chain_ident
4532                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4533                                                    let __reduce_keyed_fn = #f_tokens;
4534                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4535                                                        if let Some((k, v)) = opt_payload {
4536                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4537                                                                if k < curr_watermark {
4538                                                                    return;
4539                                                                }
4540                                                            }
4541                                                            match map.entry(k) {
4542                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4543                                                                    e.insert(v);
4544                                                                }
4545                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4546                                                                    __reduce_keyed_fn(e.get_mut(), v);
4547                                                                }
4548                                                            }
4549                                                        } else {
4550                                                            let watermark = opt_watermark.unwrap();
4551                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4552                                                                if watermark <= curr_watermark {
4553                                                                    return;
4554                                                                }
4555                                                            }
4556                                                            map.retain(|k, _| *k >= watermark);
4557                                                            *opt_curr_watermark = Some(watermark);
4558                                                        }
4559                                                    }
4560                                                })
4561                                                -> flat_map(|(map, _curr_watermark)| map);
4562                                        },
4563                                        None,
4564                                        Some(&next_stmt_id.to_string()),
4565                                    );
4566                                }
4567                            }
4568                            BuildersOrCallback::Callback(_, node_callback) => {
4569                                node_callback(node, next_stmt_id);
4570                            }
4571                        }
4572
4573                        *next_stmt_id += 1;
4574
4575                        ident_stack.push(fold_ident);
4576                    }
4577
4578                    HydroNode::Network {
4579                        networking_info,
4580                        serialize_fn: serialize_pipeline,
4581                        instantiate_fn,
4582                        deserialize_fn: deserialize_pipeline,
4583                        input,
4584                        ..
4585                    } => {
4586                        let input_ident = ident_stack.pop().unwrap();
4587
4588                        let receiver_stream_ident =
4589                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4590
4591                        match builders_or_callback {
4592                            BuildersOrCallback::Builders(graph_builders) => {
4593                                let (sink_expr, source_expr) = match instantiate_fn {
4594                                    DebugInstantiate::Building => (
4595                                        syn::parse_quote!(DUMMY_SINK),
4596                                        syn::parse_quote!(DUMMY_SOURCE),
4597                                    ),
4598
4599                                    DebugInstantiate::Finalized(finalized) => {
4600                                        (finalized.sink.clone(), finalized.source.clone())
4601                                    }
4602                                };
4603
4604                                graph_builders.create_network(
4605                                    &input.metadata().location_id,
4606                                    &out_location,
4607                                    input_ident,
4608                                    &receiver_stream_ident,
4609                                    serialize_pipeline.as_ref(),
4610                                    sink_expr,
4611                                    source_expr,
4612                                    deserialize_pipeline.as_ref(),
4613                                    *next_stmt_id,
4614                                    networking_info,
4615                                );
4616                            }
4617                            BuildersOrCallback::Callback(_, node_callback) => {
4618                                node_callback(node, next_stmt_id);
4619                            }
4620                        }
4621
4622                        *next_stmt_id += 1;
4623
4624                        ident_stack.push(receiver_stream_ident);
4625                    }
4626
4627                    HydroNode::ExternalInput {
4628                        instantiate_fn,
4629                        deserialize_fn: deserialize_pipeline,
4630                        ..
4631                    } => {
4632                        let receiver_stream_ident =
4633                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4634
4635                        match builders_or_callback {
4636                            BuildersOrCallback::Builders(graph_builders) => {
4637                                let (_, source_expr) = match instantiate_fn {
4638                                    DebugInstantiate::Building => (
4639                                        syn::parse_quote!(DUMMY_SINK),
4640                                        syn::parse_quote!(DUMMY_SOURCE),
4641                                    ),
4642
4643                                    DebugInstantiate::Finalized(finalized) => {
4644                                        (finalized.sink.clone(), finalized.source.clone())
4645                                    }
4646                                };
4647
4648                                graph_builders.create_external_source(
4649                                    &out_location,
4650                                    source_expr,
4651                                    &receiver_stream_ident,
4652                                    deserialize_pipeline.as_ref(),
4653                                    *next_stmt_id,
4654                                );
4655                            }
4656                            BuildersOrCallback::Callback(_, node_callback) => {
4657                                node_callback(node, next_stmt_id);
4658                            }
4659                        }
4660
4661                        *next_stmt_id += 1;
4662
4663                        ident_stack.push(receiver_stream_ident);
4664                    }
4665
4666                    HydroNode::Counter {
4667                        tag,
4668                        duration,
4669                        prefix,
4670                        ..
4671                    } => {
4672                        let input_ident = ident_stack.pop().unwrap();
4673
4674                        let counter_ident =
4675                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4676
4677                        match builders_or_callback {
4678                            BuildersOrCallback::Builders(graph_builders) => {
4679                                let arg = format!("{}({})", prefix, tag);
4680                                let builder = graph_builders.get_dfir_mut(&out_location);
4681                                builder.add_dfir(
4682                                    parse_quote! {
4683                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4684                                    },
4685                                    None,
4686                                    Some(&next_stmt_id.to_string()),
4687                                );
4688                            }
4689                            BuildersOrCallback::Callback(_, node_callback) => {
4690                                node_callback(node, next_stmt_id);
4691                            }
4692                        }
4693
4694                        *next_stmt_id += 1;
4695
4696                        ident_stack.push(counter_ident);
4697                    }
4698                }
4699            },
4700            seen_tees,
4701            false,
4702        );
4703
4704        let ret = ident_stack
4705            .pop()
4706            .expect("ident_stack should have exactly one element after traversal");
4707        assert!(
4708            ident_stack.is_empty(),
4709            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4710             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4711            ident_stack.len()
4712        );
4713        ret
4714    }
4715
4716    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4717        match self {
4718            HydroNode::Placeholder => {
4719                panic!()
4720            }
4721            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4722            HydroNode::Source { source, .. } => match source {
4723                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4724                HydroSource::ExternalNetwork()
4725                | HydroSource::Spin()
4726                | HydroSource::ClusterMembers(_, _)
4727                | HydroSource::Embedded(_)
4728                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4729            },
4730            HydroNode::SingletonSource { value, .. } => {
4731                transform(value);
4732            }
4733            HydroNode::CycleSource { .. }
4734            | HydroNode::Tee { .. }
4735            | HydroNode::Singleton { .. }
4736            | HydroNode::YieldConcat { .. }
4737            | HydroNode::BeginAtomic { .. }
4738            | HydroNode::EndAtomic { .. }
4739            | HydroNode::Batch { .. }
4740            | HydroNode::Chain { .. }
4741            | HydroNode::MergeOrdered { .. }
4742            | HydroNode::ChainFirst { .. }
4743            | HydroNode::CrossProduct { .. }
4744            | HydroNode::CrossSingleton { .. }
4745            | HydroNode::ResolveFutures { .. }
4746            | HydroNode::ResolveFuturesBlocking { .. }
4747            | HydroNode::ResolveFuturesOrdered { .. }
4748            | HydroNode::Join { .. }
4749            | HydroNode::JoinHalf { .. }
4750            | HydroNode::Difference { .. }
4751            | HydroNode::AntiJoin { .. }
4752            | HydroNode::DeferTick { .. }
4753            | HydroNode::Enumerate { .. }
4754            | HydroNode::Unique { .. }
4755            | HydroNode::Sort { .. } => {}
4756            HydroNode::Map { f, .. }
4757            | HydroNode::FlatMap { f, .. }
4758            | HydroNode::FlatMapStreamBlocking { f, .. }
4759            | HydroNode::Filter { f, .. }
4760            | HydroNode::FilterMap { f, .. }
4761            | HydroNode::Inspect { f, .. }
4762            | HydroNode::Partition { f, .. }
4763            | HydroNode::Reduce { f, .. }
4764            | HydroNode::ReduceKeyed { f, .. }
4765            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4766                transform(&mut f.expr);
4767            }
4768            HydroNode::Fold { init, acc, .. }
4769            | HydroNode::Scan { init, acc, .. }
4770            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4771            | HydroNode::FoldKeyed { init, acc, .. } => {
4772                transform(&mut init.expr);
4773                transform(&mut acc.expr);
4774            }
4775            HydroNode::Network {
4776                serialize_fn,
4777                deserialize_fn,
4778                ..
4779            } => {
4780                if let Some(serialize_fn) = serialize_fn {
4781                    transform(serialize_fn);
4782                }
4783                if let Some(deserialize_fn) = deserialize_fn {
4784                    transform(deserialize_fn);
4785                }
4786            }
4787            HydroNode::ExternalInput { deserialize_fn, .. } => {
4788                if let Some(deserialize_fn) = deserialize_fn {
4789                    transform(deserialize_fn);
4790                }
4791            }
4792            HydroNode::Counter { duration, .. } => {
4793                transform(duration);
4794            }
4795        }
4796    }
4797
4798    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4799        &self.metadata().op
4800    }
4801
4802    pub fn metadata(&self) -> &HydroIrMetadata {
4803        match self {
4804            HydroNode::Placeholder => {
4805                panic!()
4806            }
4807            HydroNode::Cast { metadata, .. }
4808            | HydroNode::ObserveNonDet { metadata, .. }
4809            | HydroNode::Source { metadata, .. }
4810            | HydroNode::SingletonSource { metadata, .. }
4811            | HydroNode::CycleSource { metadata, .. }
4812            | HydroNode::Tee { metadata, .. }
4813            | HydroNode::Singleton { metadata, .. }
4814            | HydroNode::Partition { metadata, .. }
4815            | HydroNode::YieldConcat { metadata, .. }
4816            | HydroNode::BeginAtomic { metadata, .. }
4817            | HydroNode::EndAtomic { metadata, .. }
4818            | HydroNode::Batch { metadata, .. }
4819            | HydroNode::Chain { metadata, .. }
4820            | HydroNode::MergeOrdered { metadata, .. }
4821            | HydroNode::ChainFirst { metadata, .. }
4822            | HydroNode::CrossProduct { metadata, .. }
4823            | HydroNode::CrossSingleton { metadata, .. }
4824            | HydroNode::Join { metadata, .. }
4825            | HydroNode::JoinHalf { metadata, .. }
4826            | HydroNode::Difference { metadata, .. }
4827            | HydroNode::AntiJoin { metadata, .. }
4828            | HydroNode::ResolveFutures { metadata, .. }
4829            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4830            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4831            | HydroNode::Map { metadata, .. }
4832            | HydroNode::FlatMap { metadata, .. }
4833            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4834            | HydroNode::Filter { metadata, .. }
4835            | HydroNode::FilterMap { metadata, .. }
4836            | HydroNode::DeferTick { metadata, .. }
4837            | HydroNode::Enumerate { metadata, .. }
4838            | HydroNode::Inspect { metadata, .. }
4839            | HydroNode::Unique { metadata, .. }
4840            | HydroNode::Sort { metadata, .. }
4841            | HydroNode::Scan { metadata, .. }
4842            | HydroNode::ScanAsyncBlocking { metadata, .. }
4843            | HydroNode::Fold { metadata, .. }
4844            | HydroNode::FoldKeyed { metadata, .. }
4845            | HydroNode::Reduce { metadata, .. }
4846            | HydroNode::ReduceKeyed { metadata, .. }
4847            | HydroNode::ReduceKeyedWatermark { metadata, .. }
4848            | HydroNode::ExternalInput { metadata, .. }
4849            | HydroNode::Network { metadata, .. }
4850            | HydroNode::Counter { metadata, .. } => metadata,
4851        }
4852    }
4853
4854    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4855        &mut self.metadata_mut().op
4856    }
4857
4858    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4859        match self {
4860            HydroNode::Placeholder => {
4861                panic!()
4862            }
4863            HydroNode::Cast { metadata, .. }
4864            | HydroNode::ObserveNonDet { metadata, .. }
4865            | HydroNode::Source { metadata, .. }
4866            | HydroNode::SingletonSource { metadata, .. }
4867            | HydroNode::CycleSource { metadata, .. }
4868            | HydroNode::Tee { metadata, .. }
4869            | HydroNode::Singleton { metadata, .. }
4870            | HydroNode::Partition { metadata, .. }
4871            | HydroNode::YieldConcat { metadata, .. }
4872            | HydroNode::BeginAtomic { metadata, .. }
4873            | HydroNode::EndAtomic { metadata, .. }
4874            | HydroNode::Batch { metadata, .. }
4875            | HydroNode::Chain { metadata, .. }
4876            | HydroNode::MergeOrdered { metadata, .. }
4877            | HydroNode::ChainFirst { metadata, .. }
4878            | HydroNode::CrossProduct { metadata, .. }
4879            | HydroNode::CrossSingleton { metadata, .. }
4880            | HydroNode::Join { metadata, .. }
4881            | HydroNode::JoinHalf { metadata, .. }
4882            | HydroNode::Difference { metadata, .. }
4883            | HydroNode::AntiJoin { metadata, .. }
4884            | HydroNode::ResolveFutures { metadata, .. }
4885            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4886            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4887            | HydroNode::Map { metadata, .. }
4888            | HydroNode::FlatMap { metadata, .. }
4889            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4890            | HydroNode::Filter { metadata, .. }
4891            | HydroNode::FilterMap { metadata, .. }
4892            | HydroNode::DeferTick { metadata, .. }
4893            | HydroNode::Enumerate { metadata, .. }
4894            | HydroNode::Inspect { metadata, .. }
4895            | HydroNode::Unique { metadata, .. }
4896            | HydroNode::Sort { metadata, .. }
4897            | HydroNode::Scan { metadata, .. }
4898            | HydroNode::ScanAsyncBlocking { metadata, .. }
4899            | HydroNode::Fold { metadata, .. }
4900            | HydroNode::FoldKeyed { metadata, .. }
4901            | HydroNode::Reduce { metadata, .. }
4902            | HydroNode::ReduceKeyed { metadata, .. }
4903            | HydroNode::ReduceKeyedWatermark { metadata, .. }
4904            | HydroNode::ExternalInput { metadata, .. }
4905            | HydroNode::Network { metadata, .. }
4906            | HydroNode::Counter { metadata, .. } => metadata,
4907        }
4908    }
4909
4910    pub fn input(&self) -> Vec<&HydroNode> {
4911        match self {
4912            HydroNode::Placeholder => {
4913                panic!()
4914            }
4915            HydroNode::Source { .. }
4916            | HydroNode::SingletonSource { .. }
4917            | HydroNode::ExternalInput { .. }
4918            | HydroNode::CycleSource { .. }
4919            | HydroNode::Tee { .. }
4920            | HydroNode::Singleton { .. }
4921            | HydroNode::Partition { .. } => {
4922                // Tee/Partition should find their input in separate special ways
4923                vec![]
4924            }
4925            HydroNode::Cast { inner, .. }
4926            | HydroNode::ObserveNonDet { inner, .. }
4927            | HydroNode::YieldConcat { inner, .. }
4928            | HydroNode::BeginAtomic { inner, .. }
4929            | HydroNode::EndAtomic { inner, .. }
4930            | HydroNode::Batch { inner, .. } => {
4931                vec![inner]
4932            }
4933            HydroNode::Chain { first, second, .. } => {
4934                vec![first, second]
4935            }
4936            HydroNode::MergeOrdered { first, second, .. } => {
4937                vec![first, second]
4938            }
4939            HydroNode::ChainFirst { first, second, .. } => {
4940                vec![first, second]
4941            }
4942            HydroNode::CrossProduct { left, right, .. }
4943            | HydroNode::CrossSingleton { left, right, .. }
4944            | HydroNode::Join { left, right, .. }
4945            | HydroNode::JoinHalf { left, right, .. } => {
4946                vec![left, right]
4947            }
4948            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4949                vec![pos, neg]
4950            }
4951            HydroNode::Map { input, .. }
4952            | HydroNode::FlatMap { input, .. }
4953            | HydroNode::FlatMapStreamBlocking { input, .. }
4954            | HydroNode::Filter { input, .. }
4955            | HydroNode::FilterMap { input, .. }
4956            | HydroNode::Sort { input, .. }
4957            | HydroNode::DeferTick { input, .. }
4958            | HydroNode::Enumerate { input, .. }
4959            | HydroNode::Inspect { input, .. }
4960            | HydroNode::Unique { input, .. }
4961            | HydroNode::Network { input, .. }
4962            | HydroNode::Counter { input, .. }
4963            | HydroNode::ResolveFutures { input, .. }
4964            | HydroNode::ResolveFuturesBlocking { input, .. }
4965            | HydroNode::ResolveFuturesOrdered { input, .. }
4966            | HydroNode::Fold { input, .. }
4967            | HydroNode::FoldKeyed { input, .. }
4968            | HydroNode::Reduce { input, .. }
4969            | HydroNode::ReduceKeyed { input, .. }
4970            | HydroNode::Scan { input, .. }
4971            | HydroNode::ScanAsyncBlocking { input, .. } => {
4972                vec![input]
4973            }
4974            HydroNode::ReduceKeyedWatermark {
4975                input, watermark, ..
4976            } => {
4977                vec![input, watermark]
4978            }
4979        }
4980    }
4981
4982    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4983        self.input()
4984            .iter()
4985            .map(|input_node| input_node.metadata())
4986            .collect()
4987    }
4988
4989    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4990    /// has other live references, meaning the upstream is already driven
4991    /// by another consumer and does not need a Null sink.
4992    pub fn is_shared_with_others(&self) -> bool {
4993        match self {
4994            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4995                Rc::strong_count(&inner.0) > 1
4996            }
4997            // A zero-output singleton() is valid in DFIR (it drains itself at
4998            // end of tick), so it doesn't need to be driven by another consumer.
4999            HydroNode::Singleton { .. } => false,
5000            _ => false,
5001        }
5002    }
5003
5004    pub fn print_root(&self) -> String {
5005        match self {
5006            HydroNode::Placeholder => {
5007                panic!()
5008            }
5009            HydroNode::Cast { .. } => "Cast()".to_owned(),
5010            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5011            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5012            HydroNode::SingletonSource {
5013                value,
5014                first_tick_only,
5015                ..
5016            } => format!(
5017                "SingletonSource({:?}, first_tick_only={})",
5018                value, first_tick_only
5019            ),
5020            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5021            HydroNode::Tee { inner, .. } => {
5022                format!("Tee({})", inner.0.borrow().print_root())
5023            }
5024            HydroNode::Singleton { inner, .. } => {
5025                format!("Singleton({})", inner.0.borrow().print_root())
5026            }
5027            HydroNode::Partition { f, is_true, .. } => {
5028                format!("Partition({:?}, is_true={})", f, is_true)
5029            }
5030            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5031            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5032            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5033            HydroNode::Batch { .. } => "Batch()".to_owned(),
5034            HydroNode::Chain { first, second, .. } => {
5035                format!("Chain({}, {})", first.print_root(), second.print_root())
5036            }
5037            HydroNode::MergeOrdered { first, second, .. } => {
5038                format!(
5039                    "MergeOrdered({}, {})",
5040                    first.print_root(),
5041                    second.print_root()
5042                )
5043            }
5044            HydroNode::ChainFirst { first, second, .. } => {
5045                format!(
5046                    "ChainFirst({}, {})",
5047                    first.print_root(),
5048                    second.print_root()
5049                )
5050            }
5051            HydroNode::CrossProduct { left, right, .. } => {
5052                format!(
5053                    "CrossProduct({}, {})",
5054                    left.print_root(),
5055                    right.print_root()
5056                )
5057            }
5058            HydroNode::CrossSingleton { left, right, .. } => {
5059                format!(
5060                    "CrossSingleton({}, {})",
5061                    left.print_root(),
5062                    right.print_root()
5063                )
5064            }
5065            HydroNode::Join { left, right, .. } => {
5066                format!("Join({}, {})", left.print_root(), right.print_root())
5067            }
5068            HydroNode::JoinHalf { left, right, .. } => {
5069                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5070            }
5071            HydroNode::Difference { pos, neg, .. } => {
5072                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5073            }
5074            HydroNode::AntiJoin { pos, neg, .. } => {
5075                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5076            }
5077            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5078            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5079            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5080            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5081            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5082            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5083            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5084            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5085            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5086            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5087            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5088            HydroNode::Unique { .. } => "Unique()".to_owned(),
5089            HydroNode::Sort { .. } => "Sort()".to_owned(),
5090            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5091            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5092            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5093                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5094            }
5095            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5096            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5097            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5098            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5099            HydroNode::Network { .. } => "Network()".to_owned(),
5100            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5101            HydroNode::Counter { tag, duration, .. } => {
5102                format!("Counter({:?}, {:?})", tag, duration)
5103            }
5104        }
5105    }
5106}
5107
5108#[cfg(feature = "build")]
5109fn instantiate_network<'a, D>(
5110    env: &mut D::InstantiateEnv,
5111    from_location: &LocationId,
5112    to_location: &LocationId,
5113    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5114    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5115    name: Option<&str>,
5116    networking_info: &crate::networking::NetworkingInfo,
5117) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5118where
5119    D: Deploy<'a>,
5120{
5121    let ((sink, source), connect_fn) = match (from_location, to_location) {
5122        (&LocationId::Process(from), &LocationId::Process(to)) => {
5123            let from_node = processes
5124                .get(from)
5125                .unwrap_or_else(|| {
5126                    panic!("A process used in the graph was not instantiated: {}", from)
5127                })
5128                .clone();
5129            let to_node = processes
5130                .get(to)
5131                .unwrap_or_else(|| {
5132                    panic!("A process used in the graph was not instantiated: {}", to)
5133                })
5134                .clone();
5135
5136            let sink_port = from_node.next_port();
5137            let source_port = to_node.next_port();
5138
5139            (
5140                D::o2o_sink_source(
5141                    env,
5142                    &from_node,
5143                    &sink_port,
5144                    &to_node,
5145                    &source_port,
5146                    name,
5147                    networking_info,
5148                ),
5149                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5150            )
5151        }
5152        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5153            let from_node = processes
5154                .get(from)
5155                .unwrap_or_else(|| {
5156                    panic!("A process used in the graph was not instantiated: {}", from)
5157                })
5158                .clone();
5159            let to_node = clusters
5160                .get(to)
5161                .unwrap_or_else(|| {
5162                    panic!("A cluster used in the graph was not instantiated: {}", to)
5163                })
5164                .clone();
5165
5166            let sink_port = from_node.next_port();
5167            let source_port = to_node.next_port();
5168
5169            (
5170                D::o2m_sink_source(
5171                    env,
5172                    &from_node,
5173                    &sink_port,
5174                    &to_node,
5175                    &source_port,
5176                    name,
5177                    networking_info,
5178                ),
5179                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5180            )
5181        }
5182        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5183            let from_node = clusters
5184                .get(from)
5185                .unwrap_or_else(|| {
5186                    panic!("A cluster used in the graph was not instantiated: {}", from)
5187                })
5188                .clone();
5189            let to_node = processes
5190                .get(to)
5191                .unwrap_or_else(|| {
5192                    panic!("A process used in the graph was not instantiated: {}", to)
5193                })
5194                .clone();
5195
5196            let sink_port = from_node.next_port();
5197            let source_port = to_node.next_port();
5198
5199            (
5200                D::m2o_sink_source(
5201                    env,
5202                    &from_node,
5203                    &sink_port,
5204                    &to_node,
5205                    &source_port,
5206                    name,
5207                    networking_info,
5208                ),
5209                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5210            )
5211        }
5212        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5213            let from_node = clusters
5214                .get(from)
5215                .unwrap_or_else(|| {
5216                    panic!("A cluster used in the graph was not instantiated: {}", from)
5217                })
5218                .clone();
5219            let to_node = clusters
5220                .get(to)
5221                .unwrap_or_else(|| {
5222                    panic!("A cluster used in the graph was not instantiated: {}", to)
5223                })
5224                .clone();
5225
5226            let sink_port = from_node.next_port();
5227            let source_port = to_node.next_port();
5228
5229            (
5230                D::m2m_sink_source(
5231                    env,
5232                    &from_node,
5233                    &sink_port,
5234                    &to_node,
5235                    &source_port,
5236                    name,
5237                    networking_info,
5238                ),
5239                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5240            )
5241        }
5242        (LocationId::Tick(_, _), _) => panic!(),
5243        (_, LocationId::Tick(_, _)) => panic!(),
5244        (LocationId::Atomic(_), _) => panic!(),
5245        (_, LocationId::Atomic(_)) => panic!(),
5246    };
5247    (sink, source, connect_fn)
5248}
5249
5250#[cfg(test)]
5251mod serde_test;
5252
5253#[cfg(test)]
5254mod test {
5255    use std::mem::size_of;
5256
5257    use stageleft::{QuotedWithContext, q};
5258
5259    use super::*;
5260
5261    #[test]
5262    #[cfg_attr(
5263        not(feature = "build"),
5264        ignore = "expects inclusion of feature-gated fields"
5265    )]
5266    fn hydro_node_size() {
5267        assert_eq!(size_of::<HydroNode>(), 256);
5268    }
5269
5270    #[test]
5271    #[cfg_attr(
5272        not(feature = "build"),
5273        ignore = "expects inclusion of feature-gated fields"
5274    )]
5275    fn hydro_root_size() {
5276        assert_eq!(size_of::<HydroRoot>(), 136);
5277    }
5278
5279    #[test]
5280    fn test_simplify_q_macro_basic() {
5281        // Test basic non-q! expression
5282        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5283        let result = simplify_q_macro(simple_expr.clone());
5284        assert_eq!(result, simple_expr);
5285    }
5286
5287    #[test]
5288    fn test_simplify_q_macro_actual_stageleft_call() {
5289        // Test a simplified version of what a real stageleft call might look like
5290        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5291        let result = simplify_q_macro(stageleft_call);
5292        // This should be processed by our visitor and simplified to q!(...)
5293        // since we detect the stageleft::runtime_support::fn_* pattern
5294        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5295    }
5296
5297    #[test]
5298    fn test_closure_no_pipe_at_start() {
5299        // Test a closure that does not start with a pipe
5300        let stageleft_call = q!({
5301            let foo = 123;
5302            move |b: usize| b + foo
5303        })
5304        .splice_fn1_ctx(&());
5305        let result = simplify_q_macro(stageleft_call);
5306        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5307    }
5308}