Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(reference_node, is_mut)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
49}
50
51impl Clone for ClosureExpr {
52    fn clone(&self) -> Self {
53        Self {
54            expr: self.expr.clone(),
55            singleton_refs: self
56                .singleton_refs
57                .iter()
58                .map(|(node, is_mut)| {
59                    let HydroNode::Reference {
60                        inner,
61                        kind,
62                        metadata,
63                    } = node
64                    else {
65                        panic!("singleton_refs should only contain HydroNode::Reference");
66                    };
67                    (
68                        HydroNode::Reference {
69                            inner: SharedNode(Rc::clone(&inner.0)),
70                            kind: *kind,
71                            metadata: metadata.clone(),
72                        },
73                        *is_mut,
74                    )
75                })
76                .collect(),
77        }
78    }
79}
80
81impl Hash for ClosureExpr {
82    fn hash<H: Hasher>(&self, state: &mut H) {
83        self.expr.hash(state);
84        // singleton_refs are structural children (like HydroIrMetadata), not
85        // identity-defining. Two closures with the same expr but different
86        // captured refs are the same closure text — the refs only affect codegen.
87    }
88}
89
90impl serde::Serialize for ClosureExpr {
91    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
92        use serde::ser::SerializeStruct;
93        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
94        s.serialize_field("expr", &self.expr)?;
95        s.serialize_field(
96            "singleton_refs",
97            &SerializableSingletonRefs(&self.singleton_refs),
98        )?;
99        s.end()
100    }
101}
102
103struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
104
105impl serde::Serialize for SerializableSingletonRefs<'_> {
106    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
107        use serde::ser::SerializeSeq;
108        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
109        for (node, is_mut) in self.0.iter() {
110            seq.serialize_element(&(node, is_mut))?;
111        }
112        seq.end()
113    }
114}
115
116impl Debug for ClosureExpr {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        Debug::fmt(&self.expr, f)
119    }
120}
121
122impl Display for ClosureExpr {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        Display::fmt(&self.expr, f)
125    }
126}
127
128impl From<syn::Expr> for ClosureExpr {
129    fn from(expr: syn::Expr) -> Self {
130        Self {
131            expr: DebugExpr(Box::new(expr)),
132            singleton_refs: Vec::new(),
133        }
134    }
135}
136
137impl From<DebugExpr> for ClosureExpr {
138    fn from(expr: DebugExpr) -> Self {
139        Self {
140            expr,
141            singleton_refs: Vec::new(),
142        }
143    }
144}
145
146impl ClosureExpr {
147    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
148        Self {
149            expr,
150            singleton_refs,
151        }
152    }
153
154    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
155        Self {
156            expr: self.expr.clone(),
157            singleton_refs: self
158                .singleton_refs
159                .iter()
160                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
161                .collect(),
162        }
163    }
164
165    pub fn transform_children(
166        &mut self,
167        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
168        seen_tees: &mut SeenSharedNodes,
169    ) {
170        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
171            transform(ref_node, seen_tees);
172        }
173    }
174
175    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
176    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
177    #[cfg(feature = "build")]
178    pub fn emit_tokens(
179        &self,
180        ident_stack: &mut Vec<syn::Ident>,
181        access_counters: &mut HashMap<*const RefCell<HydroNode>, u32>,
182    ) -> TokenStream {
183        if self.singleton_refs.is_empty() {
184            self.expr.0.to_token_stream()
185        } else {
186            assert!(
187                ident_stack.len() >= self.singleton_refs.len(),
188                "ident_stack has {} entries but expected at least {} for singleton_refs",
189                ident_stack.len(),
190                self.singleton_refs.len()
191            );
192            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
193
194            let mut let_bindings = Vec::new();
195            for ((i, (node, is_mut)), ref_ident) in
196                self.singleton_refs.iter().enumerate().zip(ref_idents)
197            {
198                let HydroNode::Reference { inner, .. } = node else {
199                    panic!("singleton_refs should only contain HydroNode::Reference");
200                };
201                let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
202                let counter = access_counters.entry(ptr).or_insert(0);
203                let group = if *is_mut {
204                    *counter += 1;
205                    let g = *counter;
206                    *counter += 1;
207                    g
208                } else {
209                    *counter
210                };
211
212                // TODO(mingwei): proper spanning?
213                let local_ident = handoff_ref_ident(i);
214                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
215                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
216                let mut_token = is_mut.then(|| quote!(mut));
217                let binding = quote! {
218                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
219                };
220                let_bindings.push(binding);
221            }
222
223            let expr = &self.expr.0;
224            quote! {
225                {
226                    #( #let_bindings )*
227                    #expr
228                }
229            }
230        }
231    }
232}
233
234/// Wrapper that displays only the tokens of a parsed expr.
235///
236/// Boxes `syn::Type` which is ~240 bytes.
237#[derive(Clone, Hash)]
238pub struct DebugExpr(pub Box<syn::Expr>);
239
240impl serde::Serialize for DebugExpr {
241    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
242        serializer.serialize_str(&self.to_string())
243    }
244}
245
246impl From<syn::Expr> for DebugExpr {
247    fn from(expr: syn::Expr) -> Self {
248        Self(Box::new(expr))
249    }
250}
251
252impl Deref for DebugExpr {
253    type Target = syn::Expr;
254
255    fn deref(&self) -> &Self::Target {
256        &self.0
257    }
258}
259
260impl ToTokens for DebugExpr {
261    fn to_tokens(&self, tokens: &mut TokenStream) {
262        self.0.to_tokens(tokens);
263    }
264}
265
266impl Debug for DebugExpr {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        write!(f, "{}", self.0.to_token_stream())
269    }
270}
271
272impl Display for DebugExpr {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        let original = self.0.as_ref().clone();
275        let simplified = simplify_q_macro(original);
276
277        // For now, just use quote formatting without trying to parse as a statement
278        // This avoids the syn::parse_quote! issues entirely
279        write!(f, "q!({})", quote::quote!(#simplified))
280    }
281}
282
283/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
284fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
285    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
286        // Look for calls to stageleft::runtime_support::fn*
287        && is_stageleft_runtime_support_call(&path_expr.path)
288        && let syn::Expr::Block(b) = &call.args[0]
289        && b.block.stmts.len() == 3
290        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
291    // skip the first two, which are imports
292    {
293        let mut e = e.clone();
294        while let syn::Expr::Block(ref mut block) = e
295            && block.block.stmts.len() == 1
296            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
297        {
298            e = inner_e;
299        }
300
301        e
302    } else {
303        expr
304    }
305}
306
307fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
308    // Check if this is a call to stageleft::runtime_support::fn*
309    if let Some(last_segment) = path.segments.last() {
310        let fn_name = last_segment.ident.to_string();
311        path.segments.len() > 2
312            && path.segments[0].ident == "stageleft"
313            && path.segments[1].ident == "runtime_support"
314            && fn_name.contains("_type_hint")
315    } else {
316        false
317    }
318}
319
320/// Debug displays the type's tokens.
321///
322/// Boxes `syn::Type` which is ~320 bytes.
323#[derive(Clone, PartialEq, Eq, Hash)]
324pub struct DebugType(pub Box<syn::Type>);
325
326impl From<syn::Type> for DebugType {
327    fn from(t: syn::Type) -> Self {
328        Self(Box::new(t))
329    }
330}
331
332impl Deref for DebugType {
333    type Target = syn::Type;
334
335    fn deref(&self) -> &Self::Target {
336        &self.0
337    }
338}
339
340impl ToTokens for DebugType {
341    fn to_tokens(&self, tokens: &mut TokenStream) {
342        self.0.to_tokens(tokens);
343    }
344}
345
346impl Debug for DebugType {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        write!(f, "{}", self.0.to_token_stream())
349    }
350}
351
352impl serde::Serialize for DebugType {
353    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
354        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
355    }
356}
357
358fn serialize_backtrace_as_span<S: serde::Serializer>(
359    backtrace: &Backtrace,
360    serializer: S,
361) -> Result<S::Ok, S::Error> {
362    match backtrace.format_span() {
363        Some(span) => serializer.serialize_some(&span),
364        None => serializer.serialize_none(),
365    }
366}
367
368fn serialize_ident<S: serde::Serializer>(
369    ident: &syn::Ident,
370    serializer: S,
371) -> Result<S::Ok, S::Error> {
372    serializer.serialize_str(&ident.to_string())
373}
374
375pub enum DebugInstantiate {
376    Building,
377    Finalized(Box<DebugInstantiateFinalized>),
378}
379
380impl serde::Serialize for DebugInstantiate {
381    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
382        match self {
383            DebugInstantiate::Building => {
384                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
385            }
386            DebugInstantiate::Finalized(_) => {
387                panic!(
388                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
389                )
390            }
391        }
392    }
393}
394
395#[cfg_attr(
396    not(feature = "build"),
397    expect(
398        dead_code,
399        reason = "sink, source unused without `feature = \"build\"`."
400    )
401)]
402pub struct DebugInstantiateFinalized {
403    sink: syn::Expr,
404    source: syn::Expr,
405    connect_fn: Option<Box<dyn FnOnce()>>,
406}
407
408impl From<DebugInstantiateFinalized> for DebugInstantiate {
409    fn from(f: DebugInstantiateFinalized) -> Self {
410        Self::Finalized(Box::new(f))
411    }
412}
413
414impl Debug for DebugInstantiate {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        write!(f, "<network instantiate>")
417    }
418}
419
420impl Hash for DebugInstantiate {
421    fn hash<H: Hasher>(&self, _state: &mut H) {
422        // Do nothing
423    }
424}
425
426impl Clone for DebugInstantiate {
427    fn clone(&self) -> Self {
428        match self {
429            DebugInstantiate::Building => DebugInstantiate::Building,
430            DebugInstantiate::Finalized(_) => {
431                panic!("DebugInstantiate::Finalized should not be cloned")
432            }
433        }
434    }
435}
436
437/// Tracks the instantiation state of a `ClusterMembers` source.
438///
439/// During `compile_network`, the first `ClusterMembers` node for a given
440/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
441/// receives the expression returned by `Deploy::cluster_membership_stream`.
442/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
443/// during code-gen they simply reference the tee output of the first node
444/// instead of creating a redundant `source_stream`.
445#[derive(Debug, Hash, Clone, serde::Serialize)]
446pub enum ClusterMembersState {
447    /// Not yet instantiated.
448    Uninit,
449    /// The primary instance: holds the stream expression and will emit
450    /// `source_stream(expr) -> tee()` during code-gen.
451    Stream(DebugExpr),
452    /// A secondary instance that references the tee output of the primary.
453    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
454    /// can derive the deterministic tee ident without extra state.
455    Tee(LocationId, LocationId),
456}
457
458/// A source in a Hydro graph, where data enters the graph.
459#[derive(Debug, Hash, Clone, serde::Serialize)]
460pub enum HydroSource {
461    Stream(DebugExpr),
462    ExternalNetwork(),
463    Iter(DebugExpr),
464    Spin(),
465    ClusterMembers(LocationId, ClusterMembersState),
466    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
467    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
468}
469
470#[cfg(feature = "build")]
471/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
472/// and simulations.
473///
474/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
475/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
476pub trait DfirBuilder {
477    /// Whether the representation of singletons should include intermediate states.
478    fn singleton_intermediates(&self) -> bool;
479
480    /// Gets the DFIR builder for the given location, creating it if necessary.
481    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
482
483    #[expect(clippy::too_many_arguments, reason = "TODO")]
484    fn batch(
485        &mut self,
486        in_ident: syn::Ident,
487        in_location: &LocationId,
488        in_kind: &CollectionKind,
489        out_ident: &syn::Ident,
490        out_location: &LocationId,
491        op_meta: &HydroIrOpMetadata,
492        fold_hooked_idents: &HashSet<String>,
493    );
494    fn yield_from_tick(
495        &mut self,
496        in_ident: syn::Ident,
497        in_location: &LocationId,
498        in_kind: &CollectionKind,
499        out_ident: &syn::Ident,
500        out_location: &LocationId,
501    );
502
503    fn begin_atomic(
504        &mut self,
505        in_ident: syn::Ident,
506        in_location: &LocationId,
507        in_kind: &CollectionKind,
508        out_ident: &syn::Ident,
509        out_location: &LocationId,
510        op_meta: &HydroIrOpMetadata,
511    );
512    fn end_atomic(
513        &mut self,
514        in_ident: syn::Ident,
515        in_location: &LocationId,
516        in_kind: &CollectionKind,
517        out_ident: &syn::Ident,
518    );
519
520    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
521    fn observe_nondet(
522        &mut self,
523        trusted: bool,
524        location: &LocationId,
525        in_ident: syn::Ident,
526        in_kind: &CollectionKind,
527        out_ident: &syn::Ident,
528        out_kind: &CollectionKind,
529        op_meta: &HydroIrOpMetadata,
530    );
531
532    #[expect(clippy::too_many_arguments, reason = "TODO")]
533    fn merge_ordered(
534        &mut self,
535        location: &LocationId,
536        first_ident: syn::Ident,
537        second_ident: syn::Ident,
538        out_ident: &syn::Ident,
539        in_kind: &CollectionKind,
540        op_meta: &HydroIrOpMetadata,
541        operator_tag: Option<&str>,
542    );
543
544    #[expect(clippy::too_many_arguments, reason = "TODO")]
545    fn create_network(
546        &mut self,
547        from: &LocationId,
548        to: &LocationId,
549        input_ident: syn::Ident,
550        out_ident: &syn::Ident,
551        serialize: Option<&DebugExpr>,
552        sink: syn::Expr,
553        source: syn::Expr,
554        deserialize: Option<&DebugExpr>,
555        tag_id: StmtId,
556        networking_info: &crate::networking::NetworkingInfo,
557    );
558
559    fn create_external_source(
560        &mut self,
561        on: &LocationId,
562        source_expr: syn::Expr,
563        out_ident: &syn::Ident,
564        deserialize: Option<&DebugExpr>,
565        tag_id: StmtId,
566    );
567
568    fn create_external_output(
569        &mut self,
570        on: &LocationId,
571        sink_expr: syn::Expr,
572        input_ident: &syn::Ident,
573        serialize: Option<&DebugExpr>,
574        tag_id: StmtId,
575    );
576
577    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
578    /// Returns the new input ident to use for the fold if a hook was emitted.
579    fn emit_fold_hook(
580        &mut self,
581        location: &LocationId,
582        in_ident: &syn::Ident,
583        in_kind: &CollectionKind,
584        op_meta: &HydroIrOpMetadata,
585    ) -> Option<syn::Ident>;
586
587    /// Inserts necessary code to validate a manual assertion that at this point the
588    /// input live collection is consistent. In production, this is a no-op, but in simulation
589    /// this will (not yet implemented) inject assertions that validate consistency.
590    fn assert_is_consistent(
591        &mut self,
592        trusted: bool,
593        location: &LocationId,
594        in_ident: syn::Ident,
595        out_ident: &syn::Ident,
596    );
597}
598
599#[cfg(feature = "build")]
600impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
601    fn singleton_intermediates(&self) -> bool {
602        false
603    }
604
605    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
606        self.entry(location.root().key())
607            .expect("location was removed")
608            .or_default()
609    }
610
611    fn batch(
612        &mut self,
613        in_ident: syn::Ident,
614        in_location: &LocationId,
615        in_kind: &CollectionKind,
616        out_ident: &syn::Ident,
617        _out_location: &LocationId,
618        _op_meta: &HydroIrOpMetadata,
619        _fold_hooked_idents: &HashSet<String>,
620    ) {
621        let builder = self.get_dfir_mut(in_location.root());
622        if in_kind.is_bounded()
623            && matches!(
624                in_kind,
625                CollectionKind::Singleton { .. }
626                    | CollectionKind::Optional { .. }
627                    | CollectionKind::KeyedSingleton { .. }
628            )
629        {
630            assert!(in_location.is_top_level());
631            builder.add_dfir(
632                parse_quote! {
633                    #out_ident = #in_ident -> persist::<'static>();
634                },
635                None,
636                None,
637            );
638        } else {
639            builder.add_dfir(
640                parse_quote! {
641                    #out_ident = #in_ident;
642                },
643                None,
644                None,
645            );
646        }
647    }
648
649    fn yield_from_tick(
650        &mut self,
651        in_ident: syn::Ident,
652        in_location: &LocationId,
653        _in_kind: &CollectionKind,
654        out_ident: &syn::Ident,
655        _out_location: &LocationId,
656    ) {
657        let builder = self.get_dfir_mut(in_location.root());
658        builder.add_dfir(
659            parse_quote! {
660                #out_ident = #in_ident;
661            },
662            None,
663            None,
664        );
665    }
666
667    fn begin_atomic(
668        &mut self,
669        in_ident: syn::Ident,
670        in_location: &LocationId,
671        _in_kind: &CollectionKind,
672        out_ident: &syn::Ident,
673        _out_location: &LocationId,
674        _op_meta: &HydroIrOpMetadata,
675    ) {
676        let builder = self.get_dfir_mut(in_location.root());
677        builder.add_dfir(
678            parse_quote! {
679                #out_ident = #in_ident;
680            },
681            None,
682            None,
683        );
684    }
685
686    fn end_atomic(
687        &mut self,
688        in_ident: syn::Ident,
689        in_location: &LocationId,
690        _in_kind: &CollectionKind,
691        out_ident: &syn::Ident,
692    ) {
693        let builder = self.get_dfir_mut(in_location.root());
694        builder.add_dfir(
695            parse_quote! {
696                #out_ident = #in_ident;
697            },
698            None,
699            None,
700        );
701    }
702
703    fn observe_nondet(
704        &mut self,
705        _trusted: bool,
706        location: &LocationId,
707        in_ident: syn::Ident,
708        _in_kind: &CollectionKind,
709        out_ident: &syn::Ident,
710        _out_kind: &CollectionKind,
711        _op_meta: &HydroIrOpMetadata,
712    ) {
713        let builder = self.get_dfir_mut(location);
714        builder.add_dfir(
715            parse_quote! {
716                #out_ident = #in_ident;
717            },
718            None,
719            None,
720        );
721    }
722
723    fn merge_ordered(
724        &mut self,
725        location: &LocationId,
726        first_ident: syn::Ident,
727        second_ident: syn::Ident,
728        out_ident: &syn::Ident,
729        _in_kind: &CollectionKind,
730        _op_meta: &HydroIrOpMetadata,
731        operator_tag: Option<&str>,
732    ) {
733        let builder = self.get_dfir_mut(location);
734        builder.add_dfir(
735            parse_quote! {
736                #out_ident = union();
737                #first_ident -> [0]#out_ident;
738                #second_ident -> [1]#out_ident;
739            },
740            None,
741            operator_tag,
742        );
743    }
744
745    fn create_network(
746        &mut self,
747        from: &LocationId,
748        to: &LocationId,
749        input_ident: syn::Ident,
750        out_ident: &syn::Ident,
751        serialize: Option<&DebugExpr>,
752        sink: syn::Expr,
753        source: syn::Expr,
754        deserialize: Option<&DebugExpr>,
755        tag_id: StmtId,
756        _networking_info: &crate::networking::NetworkingInfo,
757    ) {
758        let sender_builder = self.get_dfir_mut(from);
759        if let Some(serialize_pipeline) = serialize {
760            sender_builder.add_dfir(
761                parse_quote! {
762                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
763                },
764                None,
765                // operator tag separates send and receive, which otherwise have the same next_stmt_id
766                Some(&format!("send{}", tag_id)),
767            );
768        } else {
769            sender_builder.add_dfir(
770                parse_quote! {
771                    #input_ident -> dest_sink(#sink);
772                },
773                None,
774                Some(&format!("send{}", tag_id)),
775            );
776        }
777
778        let receiver_builder = self.get_dfir_mut(to);
779        if let Some(deserialize_pipeline) = deserialize {
780            receiver_builder.add_dfir(
781                parse_quote! {
782                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
783                },
784                None,
785                Some(&format!("recv{}", tag_id)),
786            );
787        } else {
788            receiver_builder.add_dfir(
789                parse_quote! {
790                    #out_ident = source_stream(#source);
791                },
792                None,
793                Some(&format!("recv{}", tag_id)),
794            );
795        }
796    }
797
798    fn create_external_source(
799        &mut self,
800        on: &LocationId,
801        source_expr: syn::Expr,
802        out_ident: &syn::Ident,
803        deserialize: Option<&DebugExpr>,
804        tag_id: StmtId,
805    ) {
806        let receiver_builder = self.get_dfir_mut(on);
807        if let Some(deserialize_pipeline) = deserialize {
808            receiver_builder.add_dfir(
809                parse_quote! {
810                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
811                },
812                None,
813                Some(&format!("recv{}", tag_id)),
814            );
815        } else {
816            receiver_builder.add_dfir(
817                parse_quote! {
818                    #out_ident = source_stream(#source_expr);
819                },
820                None,
821                Some(&format!("recv{}", tag_id)),
822            );
823        }
824    }
825
826    fn create_external_output(
827        &mut self,
828        on: &LocationId,
829        sink_expr: syn::Expr,
830        input_ident: &syn::Ident,
831        serialize: Option<&DebugExpr>,
832        tag_id: StmtId,
833    ) {
834        let sender_builder = self.get_dfir_mut(on);
835        if let Some(serialize_fn) = serialize {
836            sender_builder.add_dfir(
837                parse_quote! {
838                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
839                },
840                None,
841                // operator tag separates send and receive, which otherwise have the same next_stmt_id
842                Some(&format!("send{}", tag_id)),
843            );
844        } else {
845            sender_builder.add_dfir(
846                parse_quote! {
847                    #input_ident -> dest_sink(#sink_expr);
848                },
849                None,
850                Some(&format!("send{}", tag_id)),
851            );
852        }
853    }
854
855    fn emit_fold_hook(
856        &mut self,
857        _location: &LocationId,
858        _in_ident: &syn::Ident,
859        _in_kind: &CollectionKind,
860        _op_meta: &HydroIrOpMetadata,
861    ) -> Option<syn::Ident> {
862        None
863    }
864
865    fn assert_is_consistent(
866        &mut self,
867        _trusted: bool,
868        location: &LocationId,
869        in_ident: syn::Ident,
870        out_ident: &syn::Ident,
871    ) {
872        let builder = self.get_dfir_mut(location);
873        builder.add_dfir(
874            parse_quote! {
875                #out_ident = #in_ident;
876            },
877            None,
878            None,
879        );
880    }
881}
882
883#[cfg(feature = "build")]
884pub enum BuildersOrCallback<'a, L, N>
885where
886    L: FnMut(&mut HydroRoot, &mut StmtId),
887    N: FnMut(&mut HydroNode, &mut StmtId),
888{
889    Builders(&'a mut dyn DfirBuilder),
890    Callback(L, N),
891}
892
893/// An root in a Hydro graph, which is an pipeline that doesn't emit
894/// any downstream values. Traversals over the dataflow graph and
895/// generating DFIR IR start from roots.
896#[derive(Debug, Hash, serde::Serialize)]
897pub enum HydroRoot {
898    ForEach {
899        f: ClosureExpr,
900        input: Box<HydroNode>,
901        op_metadata: HydroIrOpMetadata,
902    },
903    SendExternal {
904        to_external_key: LocationKey,
905        to_port_id: ExternalPortId,
906        to_many: bool,
907        unpaired: bool,
908        serialize_fn: Option<DebugExpr>,
909        instantiate_fn: DebugInstantiate,
910        input: Box<HydroNode>,
911        op_metadata: HydroIrOpMetadata,
912    },
913    DestSink {
914        sink: DebugExpr,
915        input: Box<HydroNode>,
916        op_metadata: HydroIrOpMetadata,
917    },
918    CycleSink {
919        cycle_id: CycleId,
920        input: Box<HydroNode>,
921        op_metadata: HydroIrOpMetadata,
922    },
923    EmbeddedOutput {
924        #[serde(serialize_with = "serialize_ident")]
925        ident: syn::Ident,
926        input: Box<HydroNode>,
927        op_metadata: HydroIrOpMetadata,
928    },
929    Null {
930        input: Box<HydroNode>,
931        op_metadata: HydroIrOpMetadata,
932    },
933}
934
935impl HydroRoot {
936    #[cfg(feature = "build")]
937    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
938    pub fn compile_network<'a, D>(
939        &mut self,
940        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
941        seen_tees: &mut SeenSharedNodes,
942        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
943        processes: &SparseSecondaryMap<LocationKey, D::Process>,
944        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
945        externals: &SparseSecondaryMap<LocationKey, D::External>,
946        env: &mut D::InstantiateEnv,
947    ) where
948        D: Deploy<'a>,
949    {
950        let refcell_extra_stmts = RefCell::new(extra_stmts);
951        let refcell_env = RefCell::new(env);
952        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
953        self.transform_bottom_up(
954            &mut |l| {
955                if let HydroRoot::SendExternal {
956                    input,
957                    to_external_key,
958                    to_port_id,
959                    to_many,
960                    unpaired,
961                    instantiate_fn,
962                    ..
963                } = l
964                {
965                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
966                        DebugInstantiate::Building => {
967                            let to_node = externals
968                                .get(*to_external_key)
969                                .unwrap_or_else(|| {
970                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
971                                })
972                                .clone();
973
974                            match input.metadata().location_id.root() {
975                                &LocationId::Process(process_key) => {
976                                    if *to_many {
977                                        (
978                                            (
979                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
980                                                parse_quote!(DUMMY),
981                                            ),
982                                            Box::new(|| {}) as Box<dyn FnOnce()>,
983                                        )
984                                    } else {
985                                        let from_node = processes
986                                            .get(process_key)
987                                            .unwrap_or_else(|| {
988                                                panic!("A process used in the graph was not instantiated: {}", process_key)
989                                            })
990                                            .clone();
991
992                                        let sink_port = from_node.next_port();
993                                        let source_port = to_node.next_port();
994
995                                        if *unpaired {
996                                            use stageleft::quote_type;
997                                            use tokio_util::codec::LengthDelimitedCodec;
998
999                                            to_node.register(*to_port_id, source_port.clone());
1000
1001                                            let _ = D::e2o_source(
1002                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1003                                                &to_node, &source_port,
1004                                                &from_node, &sink_port,
1005                                                &quote_type::<LengthDelimitedCodec>(),
1006                                                format!("{}_{}", *to_external_key, *to_port_id)
1007                                            );
1008                                        }
1009
1010                                        (
1011                                            (
1012                                                D::o2e_sink(
1013                                                    &from_node,
1014                                                    &sink_port,
1015                                                    &to_node,
1016                                                    &source_port,
1017                                                    format!("{}_{}", *to_external_key, *to_port_id)
1018                                                ),
1019                                                parse_quote!(DUMMY),
1020                                            ),
1021                                            if *unpaired {
1022                                                D::e2o_connect(
1023                                                    &to_node,
1024                                                    &source_port,
1025                                                    &from_node,
1026                                                    &sink_port,
1027                                                    *to_many,
1028                                                    NetworkHint::Auto,
1029                                                )
1030                                            } else {
1031                                                Box::new(|| {}) as Box<dyn FnOnce()>
1032                                            },
1033                                        )
1034                                    }
1035                                }
1036                                LocationId::Cluster(cluster_key) => {
1037                                    let from_node = clusters
1038                                        .get(*cluster_key)
1039                                        .unwrap_or_else(|| {
1040                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1041                                        })
1042                                        .clone();
1043
1044                                    let sink_port = from_node.next_port();
1045                                    let source_port = to_node.next_port();
1046
1047                                    if *unpaired {
1048                                        to_node.register(*to_port_id, source_port.clone());
1049                                    }
1050
1051                                    (
1052                                        (
1053                                            D::m2e_sink(
1054                                                &from_node,
1055                                                &sink_port,
1056                                                &to_node,
1057                                                &source_port,
1058                                                format!("{}_{}", *to_external_key, *to_port_id)
1059                                            ),
1060                                            parse_quote!(DUMMY),
1061                                        ),
1062                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1063                                    )
1064                                }
1065                                _ => panic!()
1066                            }
1067                        },
1068
1069                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1070                    };
1071
1072                    *instantiate_fn = DebugInstantiateFinalized {
1073                        sink: sink_expr,
1074                        source: source_expr,
1075                        connect_fn: Some(connect_fn),
1076                    }
1077                    .into();
1078                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1079                    let element_type = match &input.metadata().collection_kind {
1080                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1081                        _ => panic!("Embedded output must have Stream collection kind"),
1082                    };
1083                    let location_key = match input.metadata().location_id.root() {
1084                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1085                        _ => panic!("Embedded output must be on a process or cluster"),
1086                    };
1087                    D::register_embedded_output(
1088                        &mut refcell_env.borrow_mut(),
1089                        location_key,
1090                        ident,
1091                        &element_type,
1092                    );
1093                }
1094            },
1095            &mut |n| {
1096                if let HydroNode::Network {
1097                    name,
1098                    networking_info,
1099                    input,
1100                    instantiate_fn,
1101                    metadata,
1102                    ..
1103                } = n
1104                {
1105                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1106                        DebugInstantiate::Building => instantiate_network::<D>(
1107                            &mut refcell_env.borrow_mut(),
1108                            input.metadata().location_id.root(),
1109                            metadata.location_id.root(),
1110                            processes,
1111                            clusters,
1112                            name.as_deref(),
1113                            networking_info,
1114                        ),
1115
1116                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1117                    };
1118
1119                    *instantiate_fn = DebugInstantiateFinalized {
1120                        sink: sink_expr,
1121                        source: source_expr,
1122                        connect_fn: Some(connect_fn),
1123                    }
1124                    .into();
1125                } else if let HydroNode::ExternalInput {
1126                    from_external_key,
1127                    from_port_id,
1128                    from_many,
1129                    codec_type,
1130                    port_hint,
1131                    instantiate_fn,
1132                    metadata,
1133                    ..
1134                } = n
1135                {
1136                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1137                        DebugInstantiate::Building => {
1138                            let from_node = externals
1139                                .get(*from_external_key)
1140                                .unwrap_or_else(|| {
1141                                    panic!(
1142                                        "A external used in the graph was not instantiated: {}",
1143                                        from_external_key,
1144                                    )
1145                                })
1146                                .clone();
1147
1148                            match metadata.location_id.root() {
1149                                &LocationId::Process(process_key) => {
1150                                    let to_node = processes
1151                                        .get(process_key)
1152                                        .unwrap_or_else(|| {
1153                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1154                                        })
1155                                        .clone();
1156
1157                                    let sink_port = from_node.next_port();
1158                                    let source_port = to_node.next_port();
1159
1160                                    from_node.register(*from_port_id, sink_port.clone());
1161
1162                                    (
1163                                        (
1164                                            parse_quote!(DUMMY),
1165                                            if *from_many {
1166                                                D::e2o_many_source(
1167                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1168                                                    &to_node, &source_port,
1169                                                    codec_type.0.as_ref(),
1170                                                    format!("{}_{}", *from_external_key, *from_port_id)
1171                                                )
1172                                            } else {
1173                                                D::e2o_source(
1174                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1175                                                    &from_node, &sink_port,
1176                                                    &to_node, &source_port,
1177                                                    codec_type.0.as_ref(),
1178                                                    format!("{}_{}", *from_external_key, *from_port_id)
1179                                                )
1180                                            },
1181                                        ),
1182                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1183                                    )
1184                                }
1185                                LocationId::Cluster(cluster_key) => {
1186                                    let to_node = clusters
1187                                        .get(*cluster_key)
1188                                        .unwrap_or_else(|| {
1189                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1190                                        })
1191                                        .clone();
1192
1193                                    let sink_port = from_node.next_port();
1194                                    let source_port = to_node.next_port();
1195
1196                                    from_node.register(*from_port_id, sink_port.clone());
1197
1198                                    (
1199                                        (
1200                                            parse_quote!(DUMMY),
1201                                            D::e2m_source(
1202                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1203                                                &from_node, &sink_port,
1204                                                &to_node, &source_port,
1205                                                codec_type.0.as_ref(),
1206                                                format!("{}_{}", *from_external_key, *from_port_id)
1207                                            ),
1208                                        ),
1209                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1210                                    )
1211                                }
1212                                _ => panic!()
1213                            }
1214                        },
1215
1216                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1217                    };
1218
1219                    *instantiate_fn = DebugInstantiateFinalized {
1220                        sink: sink_expr,
1221                        source: source_expr,
1222                        connect_fn: Some(connect_fn),
1223                    }
1224                    .into();
1225                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1226                    let element_type = match &metadata.collection_kind {
1227                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1228                        _ => panic!("Embedded source must have Stream collection kind"),
1229                    };
1230                    let location_key = match metadata.location_id.root() {
1231                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1232                        _ => panic!("Embedded source must be on a process or cluster"),
1233                    };
1234                    D::register_embedded_stream_input(
1235                        &mut refcell_env.borrow_mut(),
1236                        location_key,
1237                        ident,
1238                        &element_type,
1239                    );
1240                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1241                    let element_type = match &metadata.collection_kind {
1242                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1243                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1244                    };
1245                    let location_key = match metadata.location_id.root() {
1246                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1247                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1248                    };
1249                    D::register_embedded_singleton_input(
1250                        &mut refcell_env.borrow_mut(),
1251                        location_key,
1252                        ident,
1253                        &element_type,
1254                    );
1255                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1256                    match state {
1257                        ClusterMembersState::Uninit => {
1258                            let at_location = metadata.location_id.root().clone();
1259                            let key = (at_location.clone(), location_id.key());
1260                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1261                                // First occurrence: call cluster_membership_stream and mark as Stream.
1262                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1263                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1264                                    &(),
1265                                );
1266                                *state = ClusterMembersState::Stream(expr.into());
1267                            } else {
1268                                // Already instantiated for this (at, target) pair: just tee.
1269                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1270                            }
1271                        }
1272                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1273                            panic!("cluster members already finalized");
1274                        }
1275                    }
1276                }
1277            },
1278            seen_tees,
1279            false,
1280        );
1281    }
1282
1283    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1284        self.transform_bottom_up(
1285            &mut |l| {
1286                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1287                    match instantiate_fn {
1288                        DebugInstantiate::Building => panic!("network not built"),
1289
1290                        DebugInstantiate::Finalized(finalized) => {
1291                            (finalized.connect_fn.take().unwrap())();
1292                        }
1293                    }
1294                }
1295            },
1296            &mut |n| {
1297                if let HydroNode::Network { instantiate_fn, .. }
1298                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1299                {
1300                    match instantiate_fn {
1301                        DebugInstantiate::Building => panic!("network not built"),
1302
1303                        DebugInstantiate::Finalized(finalized) => {
1304                            (finalized.connect_fn.take().unwrap())();
1305                        }
1306                    }
1307                }
1308            },
1309            seen_tees,
1310            false,
1311        );
1312    }
1313
1314    pub fn transform_bottom_up(
1315        &mut self,
1316        transform_root: &mut impl FnMut(&mut HydroRoot),
1317        transform_node: &mut impl FnMut(&mut HydroNode),
1318        seen_tees: &mut SeenSharedNodes,
1319        check_well_formed: bool,
1320    ) {
1321        self.transform_children(
1322            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1323            seen_tees,
1324        );
1325
1326        transform_root(self);
1327    }
1328
1329    pub fn transform_children(
1330        &mut self,
1331        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1332        seen_tees: &mut SeenSharedNodes,
1333    ) {
1334        match self {
1335            HydroRoot::ForEach { f, input, .. } => {
1336                f.transform_children(&mut transform, seen_tees);
1337                transform(input, seen_tees);
1338            }
1339            HydroRoot::SendExternal { input, .. }
1340            | HydroRoot::DestSink { input, .. }
1341            | HydroRoot::CycleSink { input, .. }
1342            | HydroRoot::EmbeddedOutput { input, .. }
1343            | HydroRoot::Null { input, .. } => {
1344                transform(input, seen_tees);
1345            }
1346        }
1347    }
1348
1349    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1350        match self {
1351            HydroRoot::ForEach {
1352                f,
1353                input,
1354                op_metadata,
1355            } => HydroRoot::ForEach {
1356                f: f.deep_clone(seen_tees),
1357                input: Box::new(input.deep_clone(seen_tees)),
1358                op_metadata: op_metadata.clone(),
1359            },
1360            HydroRoot::SendExternal {
1361                to_external_key,
1362                to_port_id,
1363                to_many,
1364                unpaired,
1365                serialize_fn,
1366                instantiate_fn,
1367                input,
1368                op_metadata,
1369            } => HydroRoot::SendExternal {
1370                to_external_key: *to_external_key,
1371                to_port_id: *to_port_id,
1372                to_many: *to_many,
1373                unpaired: *unpaired,
1374                serialize_fn: serialize_fn.clone(),
1375                instantiate_fn: instantiate_fn.clone(),
1376                input: Box::new(input.deep_clone(seen_tees)),
1377                op_metadata: op_metadata.clone(),
1378            },
1379            HydroRoot::DestSink {
1380                sink,
1381                input,
1382                op_metadata,
1383            } => HydroRoot::DestSink {
1384                sink: sink.clone(),
1385                input: Box::new(input.deep_clone(seen_tees)),
1386                op_metadata: op_metadata.clone(),
1387            },
1388            HydroRoot::CycleSink {
1389                cycle_id,
1390                input,
1391                op_metadata,
1392            } => HydroRoot::CycleSink {
1393                cycle_id: *cycle_id,
1394                input: Box::new(input.deep_clone(seen_tees)),
1395                op_metadata: op_metadata.clone(),
1396            },
1397            HydroRoot::EmbeddedOutput {
1398                ident,
1399                input,
1400                op_metadata,
1401            } => HydroRoot::EmbeddedOutput {
1402                ident: ident.clone(),
1403                input: Box::new(input.deep_clone(seen_tees)),
1404                op_metadata: op_metadata.clone(),
1405            },
1406            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1407                input: Box::new(input.deep_clone(seen_tees)),
1408                op_metadata: op_metadata.clone(),
1409            },
1410        }
1411    }
1412
1413    #[cfg(feature = "build")]
1414    pub fn emit(
1415        &mut self,
1416        graph_builders: &mut dyn DfirBuilder,
1417        seen_tees: &mut SeenSharedNodes,
1418        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1419        next_stmt_id: &mut StmtId,
1420        fold_hooked_idents: &mut HashSet<String>,
1421    ) {
1422        self.emit_core(
1423            &mut BuildersOrCallback::<
1424                fn(&mut HydroRoot, &mut StmtId),
1425                fn(&mut HydroNode, &mut StmtId),
1426            >::Builders(graph_builders),
1427            seen_tees,
1428            built_tees,
1429            next_stmt_id,
1430            fold_hooked_idents,
1431        );
1432    }
1433
1434    #[cfg(feature = "build")]
1435    pub fn emit_core(
1436        &mut self,
1437        builders_or_callback: &mut BuildersOrCallback<
1438            impl FnMut(&mut HydroRoot, &mut StmtId),
1439            impl FnMut(&mut HydroNode, &mut StmtId),
1440        >,
1441        seen_tees: &mut SeenSharedNodes,
1442        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1443        next_stmt_id: &mut StmtId,
1444        fold_hooked_idents: &mut HashSet<String>,
1445    ) {
1446        match self {
1447            HydroRoot::ForEach { f, input, .. } => {
1448                let input_ident = input.emit_core(
1449                    builders_or_callback,
1450                    seen_tees,
1451                    built_tees,
1452                    next_stmt_id,
1453                    fold_hooked_idents,
1454                );
1455
1456                match builders_or_callback {
1457                    BuildersOrCallback::Builders(graph_builders) => {
1458                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1459                        let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> =
1460                            HashMap::new();
1461
1462                        // Look up each captured ref's ident from built_tees
1463                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1464                            let HydroNode::Reference { inner, .. } = ref_node else {
1465                                panic!("singleton_refs should only contain HydroNode::Reference");
1466                            };
1467                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1468                            let idents = built_tees.get(&ptr).expect(
1469                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1470                            );
1471                            ident_stack.push(idents[0].clone());
1472                        }
1473
1474                        let f_tokens =
1475                            f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
1476
1477                        graph_builders
1478                            .get_dfir_mut(&input.metadata().location_id)
1479                            .add_dfir(
1480                                parse_quote! {
1481                                    #input_ident -> for_each(#f_tokens);
1482                                },
1483                                None,
1484                                Some(&next_stmt_id.to_string()),
1485                            );
1486                    }
1487                    BuildersOrCallback::Callback(leaf_callback, _) => {
1488                        leaf_callback(self, next_stmt_id);
1489                    }
1490                }
1491
1492                let _ = next_stmt_id.get_and_increment();
1493            }
1494
1495            HydroRoot::SendExternal {
1496                serialize_fn,
1497                instantiate_fn,
1498                input,
1499                ..
1500            } => {
1501                let input_ident = input.emit_core(
1502                    builders_or_callback,
1503                    seen_tees,
1504                    built_tees,
1505                    next_stmt_id,
1506                    fold_hooked_idents,
1507                );
1508
1509                match builders_or_callback {
1510                    BuildersOrCallback::Builders(graph_builders) => {
1511                        let (sink_expr, _) = match instantiate_fn {
1512                            DebugInstantiate::Building => (
1513                                syn::parse_quote!(DUMMY_SINK),
1514                                syn::parse_quote!(DUMMY_SOURCE),
1515                            ),
1516
1517                            DebugInstantiate::Finalized(finalized) => {
1518                                (finalized.sink.clone(), finalized.source.clone())
1519                            }
1520                        };
1521
1522                        graph_builders.create_external_output(
1523                            &input.metadata().location_id,
1524                            sink_expr,
1525                            &input_ident,
1526                            serialize_fn.as_ref(),
1527                            *next_stmt_id,
1528                        );
1529                    }
1530                    BuildersOrCallback::Callback(leaf_callback, _) => {
1531                        leaf_callback(self, next_stmt_id);
1532                    }
1533                }
1534
1535                let _ = next_stmt_id.get_and_increment();
1536            }
1537
1538            HydroRoot::DestSink { sink, input, .. } => {
1539                let input_ident = input.emit_core(
1540                    builders_or_callback,
1541                    seen_tees,
1542                    built_tees,
1543                    next_stmt_id,
1544                    fold_hooked_idents,
1545                );
1546
1547                match builders_or_callback {
1548                    BuildersOrCallback::Builders(graph_builders) => {
1549                        graph_builders
1550                            .get_dfir_mut(&input.metadata().location_id)
1551                            .add_dfir(
1552                                parse_quote! {
1553                                    #input_ident -> dest_sink(#sink);
1554                                },
1555                                None,
1556                                Some(&next_stmt_id.to_string()),
1557                            );
1558                    }
1559                    BuildersOrCallback::Callback(leaf_callback, _) => {
1560                        leaf_callback(self, next_stmt_id);
1561                    }
1562                }
1563
1564                let _ = next_stmt_id.get_and_increment();
1565            }
1566
1567            HydroRoot::CycleSink {
1568                cycle_id, input, ..
1569            } => {
1570                let input_ident = input.emit_core(
1571                    builders_or_callback,
1572                    seen_tees,
1573                    built_tees,
1574                    next_stmt_id,
1575                    fold_hooked_idents,
1576                );
1577
1578                match builders_or_callback {
1579                    BuildersOrCallback::Builders(graph_builders) => {
1580                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1581                            CollectionKind::KeyedSingleton {
1582                                key_type,
1583                                value_type,
1584                                ..
1585                            }
1586                            | CollectionKind::KeyedStream {
1587                                key_type,
1588                                value_type,
1589                                ..
1590                            } => {
1591                                parse_quote!((#key_type, #value_type))
1592                            }
1593                            CollectionKind::Stream { element_type, .. }
1594                            | CollectionKind::Singleton { element_type, .. }
1595                            | CollectionKind::Optional { element_type, .. } => {
1596                                parse_quote!(#element_type)
1597                            }
1598                        };
1599
1600                        let cycle_id_ident = cycle_id.as_ident();
1601                        graph_builders
1602                            .get_dfir_mut(&input.metadata().location_id)
1603                            .add_dfir(
1604                                parse_quote! {
1605                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1606                                },
1607                                None,
1608                                None,
1609                            );
1610                    }
1611                    // No ID, no callback
1612                    BuildersOrCallback::Callback(_, _) => {}
1613                }
1614            }
1615
1616            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1617                let input_ident = input.emit_core(
1618                    builders_or_callback,
1619                    seen_tees,
1620                    built_tees,
1621                    next_stmt_id,
1622                    fold_hooked_idents,
1623                );
1624
1625                match builders_or_callback {
1626                    BuildersOrCallback::Builders(graph_builders) => {
1627                        graph_builders
1628                            .get_dfir_mut(&input.metadata().location_id)
1629                            .add_dfir(
1630                                parse_quote! {
1631                                    #input_ident -> for_each(&mut #ident);
1632                                },
1633                                None,
1634                                Some(&next_stmt_id.to_string()),
1635                            );
1636                    }
1637                    BuildersOrCallback::Callback(leaf_callback, _) => {
1638                        leaf_callback(self, next_stmt_id);
1639                    }
1640                }
1641
1642                let _ = next_stmt_id.get_and_increment();
1643            }
1644
1645            HydroRoot::Null { input, .. } => {
1646                let input_ident = input.emit_core(
1647                    builders_or_callback,
1648                    seen_tees,
1649                    built_tees,
1650                    next_stmt_id,
1651                    fold_hooked_idents,
1652                );
1653
1654                match builders_or_callback {
1655                    BuildersOrCallback::Builders(graph_builders) => {
1656                        graph_builders
1657                            .get_dfir_mut(&input.metadata().location_id)
1658                            .add_dfir(
1659                                parse_quote! {
1660                                    #input_ident -> for_each(|_| {});
1661                                },
1662                                None,
1663                                Some(&next_stmt_id.to_string()),
1664                            );
1665                    }
1666                    BuildersOrCallback::Callback(leaf_callback, _) => {
1667                        leaf_callback(self, next_stmt_id);
1668                    }
1669                }
1670
1671                let _ = next_stmt_id.get_and_increment();
1672            }
1673        }
1674    }
1675
1676    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1677        match self {
1678            HydroRoot::ForEach { op_metadata, .. }
1679            | HydroRoot::SendExternal { op_metadata, .. }
1680            | HydroRoot::DestSink { op_metadata, .. }
1681            | HydroRoot::CycleSink { op_metadata, .. }
1682            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1683            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1684        }
1685    }
1686
1687    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1688        match self {
1689            HydroRoot::ForEach { op_metadata, .. }
1690            | HydroRoot::SendExternal { op_metadata, .. }
1691            | HydroRoot::DestSink { op_metadata, .. }
1692            | HydroRoot::CycleSink { op_metadata, .. }
1693            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1694            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1695        }
1696    }
1697
1698    pub fn input(&self) -> &HydroNode {
1699        match self {
1700            HydroRoot::ForEach { input, .. }
1701            | HydroRoot::SendExternal { input, .. }
1702            | HydroRoot::DestSink { input, .. }
1703            | HydroRoot::CycleSink { input, .. }
1704            | HydroRoot::EmbeddedOutput { input, .. }
1705            | HydroRoot::Null { input, .. } => input,
1706        }
1707    }
1708
1709    pub fn input_metadata(&self) -> &HydroIrMetadata {
1710        self.input().metadata()
1711    }
1712
1713    pub fn print_root(&self) -> String {
1714        match self {
1715            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1716            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1717            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1718            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1719            HydroRoot::EmbeddedOutput { ident, .. } => {
1720                format!("EmbeddedOutput({})", ident)
1721            }
1722            HydroRoot::Null { .. } => "Null".to_owned(),
1723        }
1724    }
1725
1726    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1727        match self {
1728            HydroRoot::ForEach { f, .. } => {
1729                transform(&mut f.expr);
1730            }
1731            HydroRoot::DestSink { sink, .. } => {
1732                transform(sink);
1733            }
1734            HydroRoot::SendExternal { .. }
1735            | HydroRoot::CycleSink { .. }
1736            | HydroRoot::EmbeddedOutput { .. }
1737            | HydroRoot::Null { .. } => {}
1738        }
1739    }
1740}
1741
1742#[cfg(feature = "build")]
1743fn tick_of(loc: &LocationId) -> Option<ClockId> {
1744    match loc {
1745        LocationId::Tick(id, _) => Some(*id),
1746        LocationId::Atomic(inner) => tick_of(inner),
1747        _ => None,
1748    }
1749}
1750
1751#[cfg(feature = "build")]
1752fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1753    match loc {
1754        LocationId::Tick(id, inner) => {
1755            *id = uf_find(uf, *id);
1756            remap_location(inner, uf);
1757        }
1758        LocationId::Atomic(inner) => {
1759            remap_location(inner, uf);
1760        }
1761        LocationId::Process(_) | LocationId::Cluster(_) => {}
1762    }
1763}
1764
1765#[cfg(feature = "build")]
1766fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1767    let p = *parent.get(&x).unwrap_or(&x);
1768    if p == x {
1769        return x;
1770    }
1771    let root = uf_find(parent, p);
1772    parent.insert(x, root);
1773    root
1774}
1775
1776#[cfg(feature = "build")]
1777fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1778    let ra = uf_find(parent, a);
1779    let rb = uf_find(parent, b);
1780    if ra != rb {
1781        parent.insert(ra, rb);
1782    }
1783}
1784
1785/// Traverse the IR to build a union-find that unifies tick IDs connected
1786/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1787/// rewrite all `LocationId`s to use the representative tick ID.
1788#[cfg(feature = "build")]
1789pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1790    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1791
1792    // Pass 1: collect unifications.
1793    transform_bottom_up(
1794        ir,
1795        &mut |_| {},
1796        &mut |node: &mut HydroNode| match node {
1797            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1798                if let (Some(a), Some(b)) = (
1799                    tick_of(&inner.metadata().location_id),
1800                    tick_of(&metadata.location_id),
1801                ) {
1802                    uf_union(&mut uf, a, b);
1803                }
1804            }
1805            HydroNode::Chain {
1806                first,
1807                second,
1808                metadata,
1809            }
1810            | HydroNode::ChainFirst {
1811                first,
1812                second,
1813                metadata,
1814            }
1815            | HydroNode::MergeOrdered {
1816                first,
1817                second,
1818                metadata,
1819            } => {
1820                if let (Some(a), Some(b)) = (
1821                    tick_of(&first.metadata().location_id),
1822                    tick_of(&metadata.location_id),
1823                ) {
1824                    uf_union(&mut uf, a, b);
1825                }
1826                if let (Some(a), Some(b)) = (
1827                    tick_of(&second.metadata().location_id),
1828                    tick_of(&metadata.location_id),
1829                ) {
1830                    uf_union(&mut uf, a, b);
1831                }
1832            }
1833            _ => {}
1834        },
1835        false,
1836    );
1837
1838    // Pass 2: rewrite all LocationIds.
1839    transform_bottom_up(
1840        ir,
1841        &mut |_| {},
1842        &mut |node: &mut HydroNode| {
1843            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1844        },
1845        false,
1846    );
1847}
1848
1849#[cfg(feature = "build")]
1850pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1851    let mut builders = SecondaryMap::new();
1852    let mut seen_tees = HashMap::new();
1853    let mut built_tees = HashMap::new();
1854    let mut next_stmt_id = StmtId::default();
1855    let mut fold_hooked_idents = HashSet::new();
1856    for leaf in ir {
1857        leaf.emit(
1858            &mut builders,
1859            &mut seen_tees,
1860            &mut built_tees,
1861            &mut next_stmt_id,
1862            &mut fold_hooked_idents,
1863        );
1864    }
1865    builders
1866}
1867
1868#[cfg(feature = "build")]
1869pub fn traverse_dfir(
1870    ir: &mut [HydroRoot],
1871    transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1872    transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1873) {
1874    let mut seen_tees = HashMap::new();
1875    let mut built_tees = HashMap::new();
1876    let mut next_stmt_id = StmtId::default();
1877    let mut fold_hooked_idents = HashSet::new();
1878    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1879    ir.iter_mut().for_each(|leaf| {
1880        leaf.emit_core(
1881            &mut callback,
1882            &mut seen_tees,
1883            &mut built_tees,
1884            &mut next_stmt_id,
1885            &mut fold_hooked_idents,
1886        );
1887    });
1888}
1889
1890pub fn transform_bottom_up(
1891    ir: &mut [HydroRoot],
1892    transform_root: &mut impl FnMut(&mut HydroRoot),
1893    transform_node: &mut impl FnMut(&mut HydroNode),
1894    check_well_formed: bool,
1895) {
1896    let mut seen_tees = HashMap::new();
1897    ir.iter_mut().for_each(|leaf| {
1898        leaf.transform_bottom_up(
1899            transform_root,
1900            transform_node,
1901            &mut seen_tees,
1902            check_well_formed,
1903        );
1904    });
1905}
1906
1907pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1908    let mut seen_tees = HashMap::new();
1909    ir.iter()
1910        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1911        .collect()
1912}
1913
1914type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1915thread_local! {
1916    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1917    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1918    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1919    /// on subsequent encounters, preventing infinite loops.
1920    static SERIALIZED_SHARED: PrintedTees
1921        = const { RefCell::new(None) };
1922}
1923
1924pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1925    PRINTED_TEES.with(|printed_tees| {
1926        let mut printed_tees_mut = printed_tees.borrow_mut();
1927        *printed_tees_mut = Some((0, HashMap::new()));
1928        drop(printed_tees_mut);
1929
1930        let ret = f();
1931
1932        let mut printed_tees_mut = printed_tees.borrow_mut();
1933        *printed_tees_mut = None;
1934
1935        ret
1936    })
1937}
1938
1939/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1940/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1941/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1942/// back-reference.  The tracking state is restored when `f` returns or panics.
1943pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1944    let _guard = SerializedSharedGuard::enter();
1945    f()
1946}
1947
1948/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1949/// making `serialize_dedup_shared` re-entrant and panic-safe.
1950struct SerializedSharedGuard {
1951    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1952}
1953
1954impl SerializedSharedGuard {
1955    fn enter() -> Self {
1956        let previous = SERIALIZED_SHARED.with(|cell| {
1957            let mut guard = cell.borrow_mut();
1958            guard.replace((0, HashMap::new()))
1959        });
1960        Self { previous }
1961    }
1962}
1963
1964impl Drop for SerializedSharedGuard {
1965    fn drop(&mut self) {
1966        SERIALIZED_SHARED.with(|cell| {
1967            *cell.borrow_mut() = self.previous.take();
1968        });
1969    }
1970}
1971
1972pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1973
1974impl serde::Serialize for SharedNode {
1975    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1976    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1977    /// same subtree every time and, if the graph ever contains a cycle, loop
1978    /// forever.
1979    ///
1980    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1981    /// integer id.  The first time we see a pointer we assign it the next id and
1982    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1983    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1984    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1985    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1986        SERIALIZED_SHARED.with(|cell| {
1987            let mut guard = cell.borrow_mut();
1988            // (next_id, pointer → assigned_id)
1989            let state = guard.as_mut().ok_or_else(|| {
1990                serde::ser::Error::custom(
1991                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1992                )
1993            })?;
1994            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1995
1996            if let Some(&id) = state.1.get(&ptr) {
1997                drop(guard);
1998                use serde::ser::SerializeMap;
1999                let mut map = serializer.serialize_map(Some(1))?;
2000                map.serialize_entry("$shared_ref", &id)?;
2001                map.end()
2002            } else {
2003                let id = state.0;
2004                state.0 += 1;
2005                state.1.insert(ptr, id);
2006                drop(guard);
2007
2008                use serde::ser::SerializeMap;
2009                let mut map = serializer.serialize_map(Some(2))?;
2010                map.serialize_entry("$shared", &id)?;
2011                map.serialize_entry("node", &*self.0.borrow())?;
2012                map.end()
2013            }
2014        })
2015    }
2016}
2017
2018impl SharedNode {
2019    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2020        Rc::as_ptr(&self.0)
2021    }
2022}
2023
2024impl Debug for SharedNode {
2025    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2026        PRINTED_TEES.with(|printed_tees| {
2027            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2028            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2029
2030            if let Some(printed_tees_mut) = printed_tees_mut {
2031                if let Some(existing) = printed_tees_mut
2032                    .1
2033                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2034                {
2035                    write!(f, "<shared {}>", existing)
2036                } else {
2037                    let next_id = printed_tees_mut.0;
2038                    printed_tees_mut.0 += 1;
2039                    printed_tees_mut
2040                        .1
2041                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2042                    drop(printed_tees_mut_borrow);
2043                    write!(f, "<shared {}>: ", next_id)?;
2044                    Debug::fmt(&self.0.borrow(), f)
2045                }
2046            } else {
2047                drop(printed_tees_mut_borrow);
2048                write!(f, "<shared>: ")?;
2049                Debug::fmt(&self.0.borrow(), f)
2050            }
2051        })
2052    }
2053}
2054
2055impl Hash for SharedNode {
2056    fn hash<H: Hasher>(&self, state: &mut H) {
2057        self.0.borrow_mut().hash(state);
2058    }
2059}
2060
2061#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2062pub enum BoundKind {
2063    Unbounded,
2064    Bounded,
2065}
2066
2067#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2068pub enum StreamOrder {
2069    NoOrder,
2070    TotalOrder,
2071}
2072
2073#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2074pub enum StreamRetry {
2075    AtLeastOnce,
2076    ExactlyOnce,
2077}
2078
2079#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2080pub enum KeyedSingletonBoundKind {
2081    Unbounded,
2082    MonotonicValue,
2083    BoundedValue,
2084    Bounded,
2085}
2086
2087#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2088pub enum SingletonBoundKind {
2089    Unbounded,
2090    Monotonic,
2091    Bounded,
2092}
2093
2094#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2095pub enum CollectionKind {
2096    Stream {
2097        bound: BoundKind,
2098        order: StreamOrder,
2099        retry: StreamRetry,
2100        element_type: DebugType,
2101    },
2102    Singleton {
2103        bound: SingletonBoundKind,
2104        element_type: DebugType,
2105    },
2106    Optional {
2107        bound: BoundKind,
2108        element_type: DebugType,
2109    },
2110    KeyedStream {
2111        bound: BoundKind,
2112        value_order: StreamOrder,
2113        value_retry: StreamRetry,
2114        key_type: DebugType,
2115        value_type: DebugType,
2116    },
2117    KeyedSingleton {
2118        bound: KeyedSingletonBoundKind,
2119        key_type: DebugType,
2120        value_type: DebugType,
2121    },
2122}
2123
2124impl CollectionKind {
2125    pub fn is_bounded(&self) -> bool {
2126        matches!(
2127            self,
2128            CollectionKind::Stream {
2129                bound: BoundKind::Bounded,
2130                ..
2131            } | CollectionKind::Singleton {
2132                bound: SingletonBoundKind::Bounded,
2133                ..
2134            } | CollectionKind::Optional {
2135                bound: BoundKind::Bounded,
2136                ..
2137            } | CollectionKind::KeyedStream {
2138                bound: BoundKind::Bounded,
2139                ..
2140            } | CollectionKind::KeyedSingleton {
2141                bound: KeyedSingletonBoundKind::Bounded,
2142                ..
2143            }
2144        )
2145    }
2146}
2147
2148#[derive(Clone, serde::Serialize)]
2149pub struct HydroIrMetadata {
2150    pub location_id: LocationId,
2151    pub collection_kind: CollectionKind,
2152    pub consistency: Option<ClusterConsistency>,
2153    pub cardinality: Option<usize>,
2154    pub tag: Option<String>,
2155    pub op: HydroIrOpMetadata,
2156}
2157
2158// HydroIrMetadata shouldn't be used to hash or compare
2159impl Hash for HydroIrMetadata {
2160    fn hash<H: Hasher>(&self, _: &mut H) {}
2161}
2162
2163impl PartialEq for HydroIrMetadata {
2164    fn eq(&self, _: &Self) -> bool {
2165        true
2166    }
2167}
2168
2169impl Eq for HydroIrMetadata {}
2170
2171impl Debug for HydroIrMetadata {
2172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2173        f.debug_struct("HydroIrMetadata")
2174            .field("location_id", &self.location_id)
2175            .field("collection_kind", &self.collection_kind)
2176            .finish()
2177    }
2178}
2179
2180/// Metadata that is specific to the operator itself, rather than its outputs.
2181/// This is available on _both_ inner nodes and roots.
2182#[derive(Clone, serde::Serialize)]
2183pub struct HydroIrOpMetadata {
2184    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2185    pub backtrace: Backtrace,
2186    pub cpu_usage: Option<f64>,
2187    pub network_recv_cpu_usage: Option<f64>,
2188    pub id: Option<usize>,
2189}
2190
2191impl HydroIrOpMetadata {
2192    #[expect(
2193        clippy::new_without_default,
2194        reason = "explicit calls to new ensure correct backtrace bounds"
2195    )]
2196    pub fn new() -> HydroIrOpMetadata {
2197        Self::new_with_skip(1)
2198    }
2199
2200    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2201        HydroIrOpMetadata {
2202            backtrace: Backtrace::get_backtrace(2 + skip_count),
2203            cpu_usage: None,
2204            network_recv_cpu_usage: None,
2205            id: None,
2206        }
2207    }
2208}
2209
2210impl Debug for HydroIrOpMetadata {
2211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2212        f.debug_struct("HydroIrOpMetadata").finish()
2213    }
2214}
2215
2216impl Hash for HydroIrOpMetadata {
2217    fn hash<H: Hasher>(&self, _: &mut H) {}
2218}
2219
2220/// An intermediate node in a Hydro graph, which consumes data
2221/// from upstream nodes and emits data to downstream nodes.
2222#[derive(Debug, Hash, serde::Serialize)]
2223pub enum HydroNode {
2224    Placeholder,
2225
2226    /// Manually "casts" between two different collection kinds.
2227    ///
2228    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2229    /// correctness checks. In particular, the user must ensure that every possible
2230    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2231    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2232    /// collection. This ensures that the simulator does not miss any possible outputs.
2233    Cast {
2234        inner: Box<HydroNode>,
2235        metadata: HydroIrMetadata,
2236    },
2237
2238    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2239    /// interpretation of the input stream.
2240    ///
2241    /// In production, this simply passes through the input, but in simulation, this operator
2242    /// explicitly selects a randomized interpretation.
2243    ObserveNonDet {
2244        inner: Box<HydroNode>,
2245        trusted: bool, // if true, we do not need to simulate non-determinism
2246        metadata: HydroIrMetadata,
2247    },
2248
2249    Source {
2250        source: HydroSource,
2251        metadata: HydroIrMetadata,
2252    },
2253
2254    SingletonSource {
2255        value: DebugExpr,
2256        first_tick_only: bool,
2257        metadata: HydroIrMetadata,
2258    },
2259
2260    CycleSource {
2261        cycle_id: CycleId,
2262        metadata: HydroIrMetadata,
2263    },
2264
2265    Tee {
2266        inner: SharedNode,
2267        metadata: HydroIrMetadata,
2268    },
2269
2270    /// A reference materialization point. Wraps a SharedNode so that:
2271    /// - The pipe output delivers data to one consumer
2272    /// - `#var` references can borrow the value from the slot
2273    ///
2274    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2275    /// `-> handoff()` depending on `kind`.
2276    ///
2277    /// Uses the same `built_tees` dedup pattern as `Tee`.
2278    Reference {
2279        inner: SharedNode,
2280        kind: crate::handoff_ref::HandoffRefKind,
2281        metadata: HydroIrMetadata,
2282    },
2283
2284    Partition {
2285        inner: SharedNode,
2286        f: ClosureExpr,
2287        is_true: bool,
2288        metadata: HydroIrMetadata,
2289    },
2290
2291    BeginAtomic {
2292        inner: Box<HydroNode>,
2293        metadata: HydroIrMetadata,
2294    },
2295
2296    EndAtomic {
2297        inner: Box<HydroNode>,
2298        metadata: HydroIrMetadata,
2299    },
2300
2301    Batch {
2302        inner: Box<HydroNode>,
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    YieldConcat {
2307        inner: Box<HydroNode>,
2308        metadata: HydroIrMetadata,
2309    },
2310
2311    Chain {
2312        first: Box<HydroNode>,
2313        second: Box<HydroNode>,
2314        metadata: HydroIrMetadata,
2315    },
2316
2317    MergeOrdered {
2318        first: Box<HydroNode>,
2319        second: Box<HydroNode>,
2320        metadata: HydroIrMetadata,
2321    },
2322
2323    ChainFirst {
2324        first: Box<HydroNode>,
2325        second: Box<HydroNode>,
2326        metadata: HydroIrMetadata,
2327    },
2328
2329    CrossProduct {
2330        left: Box<HydroNode>,
2331        right: Box<HydroNode>,
2332        metadata: HydroIrMetadata,
2333    },
2334
2335    CrossSingleton {
2336        left: Box<HydroNode>,
2337        right: Box<HydroNode>,
2338        metadata: HydroIrMetadata,
2339    },
2340
2341    Join {
2342        left: Box<HydroNode>,
2343        right: Box<HydroNode>,
2344        metadata: HydroIrMetadata,
2345    },
2346
2347    /// Asymmetric join where the right (build) side is bounded.
2348    /// The build side is accumulated (stratum-delayed) into a hash table,
2349    /// then the left (probe) side streams through preserving its ordering.
2350    JoinHalf {
2351        left: Box<HydroNode>,
2352        right: Box<HydroNode>,
2353        metadata: HydroIrMetadata,
2354    },
2355
2356    Difference {
2357        pos: Box<HydroNode>,
2358        neg: Box<HydroNode>,
2359        metadata: HydroIrMetadata,
2360    },
2361
2362    AntiJoin {
2363        pos: Box<HydroNode>,
2364        neg: Box<HydroNode>,
2365        metadata: HydroIrMetadata,
2366    },
2367
2368    ResolveFutures {
2369        input: Box<HydroNode>,
2370        metadata: HydroIrMetadata,
2371    },
2372    ResolveFuturesBlocking {
2373        input: Box<HydroNode>,
2374        metadata: HydroIrMetadata,
2375    },
2376    ResolveFuturesOrdered {
2377        input: Box<HydroNode>,
2378        metadata: HydroIrMetadata,
2379    },
2380
2381    Map {
2382        f: ClosureExpr,
2383        input: Box<HydroNode>,
2384        metadata: HydroIrMetadata,
2385    },
2386    FlatMap {
2387        f: ClosureExpr,
2388        input: Box<HydroNode>,
2389        metadata: HydroIrMetadata,
2390    },
2391    FlatMapStreamBlocking {
2392        f: ClosureExpr,
2393        input: Box<HydroNode>,
2394        metadata: HydroIrMetadata,
2395    },
2396    Filter {
2397        f: ClosureExpr,
2398        input: Box<HydroNode>,
2399        metadata: HydroIrMetadata,
2400    },
2401    FilterMap {
2402        f: ClosureExpr,
2403        input: Box<HydroNode>,
2404        metadata: HydroIrMetadata,
2405    },
2406
2407    DeferTick {
2408        input: Box<HydroNode>,
2409        metadata: HydroIrMetadata,
2410    },
2411    Enumerate {
2412        input: Box<HydroNode>,
2413        metadata: HydroIrMetadata,
2414    },
2415    Inspect {
2416        f: ClosureExpr,
2417        input: Box<HydroNode>,
2418        metadata: HydroIrMetadata,
2419    },
2420
2421    Unique {
2422        input: Box<HydroNode>,
2423        metadata: HydroIrMetadata,
2424    },
2425
2426    Sort {
2427        input: Box<HydroNode>,
2428        metadata: HydroIrMetadata,
2429    },
2430    Fold {
2431        init: ClosureExpr,
2432        acc: ClosureExpr,
2433        input: Box<HydroNode>,
2434        metadata: HydroIrMetadata,
2435    },
2436
2437    Scan {
2438        init: ClosureExpr,
2439        acc: ClosureExpr,
2440        input: Box<HydroNode>,
2441        metadata: HydroIrMetadata,
2442    },
2443    ScanAsyncBlocking {
2444        init: ClosureExpr,
2445        acc: ClosureExpr,
2446        input: Box<HydroNode>,
2447        metadata: HydroIrMetadata,
2448    },
2449    FoldKeyed {
2450        init: ClosureExpr,
2451        acc: ClosureExpr,
2452        input: Box<HydroNode>,
2453        metadata: HydroIrMetadata,
2454    },
2455
2456    Reduce {
2457        f: ClosureExpr,
2458        input: Box<HydroNode>,
2459        metadata: HydroIrMetadata,
2460    },
2461    ReduceKeyed {
2462        f: ClosureExpr,
2463        input: Box<HydroNode>,
2464        metadata: HydroIrMetadata,
2465    },
2466    ReduceKeyedWatermark {
2467        f: ClosureExpr,
2468        input: Box<HydroNode>,
2469        watermark: Box<HydroNode>,
2470        metadata: HydroIrMetadata,
2471    },
2472
2473    Network {
2474        name: Option<String>,
2475        networking_info: crate::networking::NetworkingInfo,
2476        serialize_fn: Option<DebugExpr>,
2477        instantiate_fn: DebugInstantiate,
2478        deserialize_fn: Option<DebugExpr>,
2479        input: Box<HydroNode>,
2480        metadata: HydroIrMetadata,
2481    },
2482
2483    ExternalInput {
2484        from_external_key: LocationKey,
2485        from_port_id: ExternalPortId,
2486        from_many: bool,
2487        codec_type: DebugType,
2488        #[serde(skip)]
2489        port_hint: NetworkHint,
2490        instantiate_fn: DebugInstantiate,
2491        deserialize_fn: Option<DebugExpr>,
2492        metadata: HydroIrMetadata,
2493    },
2494
2495    Counter {
2496        tag: String,
2497        duration: DebugExpr,
2498        prefix: String,
2499        input: Box<HydroNode>,
2500        metadata: HydroIrMetadata,
2501    },
2502
2503    AssertIsConsistent {
2504        inner: Box<HydroNode>,
2505        trusted: bool,
2506        metadata: HydroIrMetadata,
2507    },
2508
2509    UnboundSingleton {
2510        inner: Box<HydroNode>,
2511        metadata: HydroIrMetadata,
2512    },
2513}
2514
2515pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2516pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2517
2518impl HydroNode {
2519    pub fn transform_bottom_up(
2520        &mut self,
2521        transform: &mut impl FnMut(&mut HydroNode),
2522        seen_tees: &mut SeenSharedNodes,
2523        check_well_formed: bool,
2524    ) {
2525        self.transform_children(
2526            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2527            seen_tees,
2528        );
2529
2530        transform(self);
2531
2532        let self_location = self.metadata().location_id.root();
2533
2534        if check_well_formed {
2535            match &*self {
2536                HydroNode::Network { .. } => {}
2537                _ => {
2538                    self.input_metadata().iter().for_each(|i| {
2539                        if i.location_id.root() != self_location {
2540                            panic!(
2541                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2542                                i,
2543                                i.location_id.root(),
2544                                self,
2545                                self_location
2546                            )
2547                        }
2548                    });
2549                }
2550            }
2551        }
2552    }
2553
2554    #[inline(always)]
2555    pub fn transform_children(
2556        &mut self,
2557        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2558        seen_tees: &mut SeenSharedNodes,
2559    ) {
2560        match self {
2561            HydroNode::Placeholder => {
2562                panic!();
2563            }
2564
2565            HydroNode::Source { .. }
2566            | HydroNode::SingletonSource { .. }
2567            | HydroNode::CycleSource { .. }
2568            | HydroNode::ExternalInput { .. } => {}
2569
2570            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2571                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2572                    *inner = SharedNode(transformed.clone());
2573                } else {
2574                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2575                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2576                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2577                    transform(&mut orig, seen_tees);
2578                    *transformed_cell.borrow_mut() = orig;
2579                    *inner = SharedNode(transformed_cell);
2580                }
2581            }
2582
2583            HydroNode::Partition { inner, f, .. } => {
2584                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2585                    *inner = SharedNode(transformed.clone());
2586                } else {
2587                    f.transform_children(&mut transform, seen_tees);
2588                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2589                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2590                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2591                    transform(&mut orig, seen_tees);
2592                    *transformed_cell.borrow_mut() = orig;
2593                    *inner = SharedNode(transformed_cell);
2594                }
2595            }
2596
2597            HydroNode::Cast { inner, .. }
2598            | HydroNode::ObserveNonDet { inner, .. }
2599            | HydroNode::BeginAtomic { inner, .. }
2600            | HydroNode::EndAtomic { inner, .. }
2601            | HydroNode::Batch { inner, .. }
2602            | HydroNode::YieldConcat { inner, .. }
2603            | HydroNode::UnboundSingleton { inner, .. }
2604            | HydroNode::AssertIsConsistent { inner, .. } => {
2605                transform(inner.as_mut(), seen_tees);
2606            }
2607
2608            HydroNode::Chain { first, second, .. } => {
2609                transform(first.as_mut(), seen_tees);
2610                transform(second.as_mut(), seen_tees);
2611            }
2612
2613            HydroNode::MergeOrdered { first, second, .. } => {
2614                transform(first.as_mut(), seen_tees);
2615                transform(second.as_mut(), seen_tees);
2616            }
2617
2618            HydroNode::ChainFirst { first, second, .. } => {
2619                transform(first.as_mut(), seen_tees);
2620                transform(second.as_mut(), seen_tees);
2621            }
2622
2623            HydroNode::CrossSingleton { left, right, .. }
2624            | HydroNode::CrossProduct { left, right, .. }
2625            | HydroNode::Join { left, right, .. }
2626            | HydroNode::JoinHalf { left, right, .. } => {
2627                transform(left.as_mut(), seen_tees);
2628                transform(right.as_mut(), seen_tees);
2629            }
2630
2631            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2632                transform(pos.as_mut(), seen_tees);
2633                transform(neg.as_mut(), seen_tees);
2634            }
2635
2636            HydroNode::Map { f, input, .. } => {
2637                f.transform_children(&mut transform, seen_tees);
2638                transform(input.as_mut(), seen_tees);
2639            }
2640            HydroNode::FlatMap { f, input, .. }
2641            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2642            | HydroNode::Filter { f, input, .. }
2643            | HydroNode::FilterMap { f, input, .. }
2644            | HydroNode::Inspect { f, input, .. }
2645            | HydroNode::Reduce { f, input, .. }
2646            | HydroNode::ReduceKeyed { f, input, .. } => {
2647                f.transform_children(&mut transform, seen_tees);
2648                transform(input.as_mut(), seen_tees);
2649            }
2650            HydroNode::ReduceKeyedWatermark {
2651                f,
2652                input,
2653                watermark,
2654                ..
2655            } => {
2656                f.transform_children(&mut transform, seen_tees);
2657                transform(input.as_mut(), seen_tees);
2658                transform(watermark.as_mut(), seen_tees);
2659            }
2660            HydroNode::Fold {
2661                init, acc, input, ..
2662            }
2663            | HydroNode::Scan {
2664                init, acc, input, ..
2665            }
2666            | HydroNode::ScanAsyncBlocking {
2667                init, acc, input, ..
2668            }
2669            | HydroNode::FoldKeyed {
2670                init, acc, input, ..
2671            } => {
2672                init.transform_children(&mut transform, seen_tees);
2673                acc.transform_children(&mut transform, seen_tees);
2674                transform(input.as_mut(), seen_tees);
2675            }
2676            HydroNode::ResolveFutures { input, .. }
2677            | HydroNode::ResolveFuturesBlocking { input, .. }
2678            | HydroNode::ResolveFuturesOrdered { input, .. }
2679            | HydroNode::Sort { input, .. }
2680            | HydroNode::DeferTick { input, .. }
2681            | HydroNode::Enumerate { input, .. }
2682            | HydroNode::Unique { input, .. }
2683            | HydroNode::Network { input, .. }
2684            | HydroNode::Counter { input, .. } => {
2685                transform(input.as_mut(), seen_tees);
2686            }
2687        }
2688    }
2689
2690    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2691        match self {
2692            HydroNode::Placeholder => HydroNode::Placeholder,
2693            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2694                inner: Box::new(inner.deep_clone(seen_tees)),
2695                metadata: metadata.clone(),
2696            },
2697            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2698                inner: Box::new(inner.deep_clone(seen_tees)),
2699                metadata: metadata.clone(),
2700            },
2701            HydroNode::ObserveNonDet {
2702                inner,
2703                trusted,
2704                metadata,
2705            } => HydroNode::ObserveNonDet {
2706                inner: Box::new(inner.deep_clone(seen_tees)),
2707                trusted: *trusted,
2708                metadata: metadata.clone(),
2709            },
2710            HydroNode::AssertIsConsistent {
2711                inner,
2712                trusted,
2713                metadata,
2714            } => HydroNode::AssertIsConsistent {
2715                inner: Box::new(inner.deep_clone(seen_tees)),
2716                trusted: *trusted,
2717                metadata: metadata.clone(),
2718            },
2719            HydroNode::Source { source, metadata } => HydroNode::Source {
2720                source: source.clone(),
2721                metadata: metadata.clone(),
2722            },
2723            HydroNode::SingletonSource {
2724                value,
2725                first_tick_only,
2726                metadata,
2727            } => HydroNode::SingletonSource {
2728                value: value.clone(),
2729                first_tick_only: *first_tick_only,
2730                metadata: metadata.clone(),
2731            },
2732            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2733                cycle_id: *cycle_id,
2734                metadata: metadata.clone(),
2735            },
2736            HydroNode::Tee { inner, metadata }
2737            | HydroNode::Reference {
2738                inner, metadata, ..
2739            } => {
2740                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2741                    SharedNode(transformed.clone())
2742                } else {
2743                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2744                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2745                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2746                    *new_rc.borrow_mut() = cloned;
2747                    SharedNode(new_rc)
2748                };
2749                if let HydroNode::Reference { kind, .. } = self {
2750                    HydroNode::Reference {
2751                        inner: cloned_inner,
2752                        kind: *kind,
2753                        metadata: metadata.clone(),
2754                    }
2755                } else {
2756                    HydroNode::Tee {
2757                        inner: cloned_inner,
2758                        metadata: metadata.clone(),
2759                    }
2760                }
2761            }
2762            HydroNode::Partition {
2763                inner,
2764                f,
2765                is_true,
2766                metadata,
2767            } => {
2768                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2769                    HydroNode::Partition {
2770                        inner: SharedNode(transformed.clone()),
2771                        f: f.deep_clone(seen_tees),
2772                        is_true: *is_true,
2773                        metadata: metadata.clone(),
2774                    }
2775                } else {
2776                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2777                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2778                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2779                    *new_rc.borrow_mut() = cloned;
2780                    HydroNode::Partition {
2781                        inner: SharedNode(new_rc),
2782                        f: f.deep_clone(seen_tees),
2783                        is_true: *is_true,
2784                        metadata: metadata.clone(),
2785                    }
2786                }
2787            }
2788            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2789                inner: Box::new(inner.deep_clone(seen_tees)),
2790                metadata: metadata.clone(),
2791            },
2792            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2793                inner: Box::new(inner.deep_clone(seen_tees)),
2794                metadata: metadata.clone(),
2795            },
2796            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2797                inner: Box::new(inner.deep_clone(seen_tees)),
2798                metadata: metadata.clone(),
2799            },
2800            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2801                inner: Box::new(inner.deep_clone(seen_tees)),
2802                metadata: metadata.clone(),
2803            },
2804            HydroNode::Chain {
2805                first,
2806                second,
2807                metadata,
2808            } => HydroNode::Chain {
2809                first: Box::new(first.deep_clone(seen_tees)),
2810                second: Box::new(second.deep_clone(seen_tees)),
2811                metadata: metadata.clone(),
2812            },
2813            HydroNode::MergeOrdered {
2814                first,
2815                second,
2816                metadata,
2817            } => HydroNode::MergeOrdered {
2818                first: Box::new(first.deep_clone(seen_tees)),
2819                second: Box::new(second.deep_clone(seen_tees)),
2820                metadata: metadata.clone(),
2821            },
2822            HydroNode::ChainFirst {
2823                first,
2824                second,
2825                metadata,
2826            } => HydroNode::ChainFirst {
2827                first: Box::new(first.deep_clone(seen_tees)),
2828                second: Box::new(second.deep_clone(seen_tees)),
2829                metadata: metadata.clone(),
2830            },
2831            HydroNode::CrossProduct {
2832                left,
2833                right,
2834                metadata,
2835            } => HydroNode::CrossProduct {
2836                left: Box::new(left.deep_clone(seen_tees)),
2837                right: Box::new(right.deep_clone(seen_tees)),
2838                metadata: metadata.clone(),
2839            },
2840            HydroNode::CrossSingleton {
2841                left,
2842                right,
2843                metadata,
2844            } => HydroNode::CrossSingleton {
2845                left: Box::new(left.deep_clone(seen_tees)),
2846                right: Box::new(right.deep_clone(seen_tees)),
2847                metadata: metadata.clone(),
2848            },
2849            HydroNode::Join {
2850                left,
2851                right,
2852                metadata,
2853            } => HydroNode::Join {
2854                left: Box::new(left.deep_clone(seen_tees)),
2855                right: Box::new(right.deep_clone(seen_tees)),
2856                metadata: metadata.clone(),
2857            },
2858            HydroNode::JoinHalf {
2859                left,
2860                right,
2861                metadata,
2862            } => HydroNode::JoinHalf {
2863                left: Box::new(left.deep_clone(seen_tees)),
2864                right: Box::new(right.deep_clone(seen_tees)),
2865                metadata: metadata.clone(),
2866            },
2867            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2868                pos: Box::new(pos.deep_clone(seen_tees)),
2869                neg: Box::new(neg.deep_clone(seen_tees)),
2870                metadata: metadata.clone(),
2871            },
2872            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2873                pos: Box::new(pos.deep_clone(seen_tees)),
2874                neg: Box::new(neg.deep_clone(seen_tees)),
2875                metadata: metadata.clone(),
2876            },
2877            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2878                input: Box::new(input.deep_clone(seen_tees)),
2879                metadata: metadata.clone(),
2880            },
2881            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2882                HydroNode::ResolveFuturesBlocking {
2883                    input: Box::new(input.deep_clone(seen_tees)),
2884                    metadata: metadata.clone(),
2885                }
2886            }
2887            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2888                HydroNode::ResolveFuturesOrdered {
2889                    input: Box::new(input.deep_clone(seen_tees)),
2890                    metadata: metadata.clone(),
2891                }
2892            }
2893            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2894                f: f.deep_clone(seen_tees),
2895                input: Box::new(input.deep_clone(seen_tees)),
2896                metadata: metadata.clone(),
2897            },
2898            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2899                f: f.deep_clone(seen_tees),
2900                input: Box::new(input.deep_clone(seen_tees)),
2901                metadata: metadata.clone(),
2902            },
2903            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2904                HydroNode::FlatMapStreamBlocking {
2905                    f: f.deep_clone(seen_tees),
2906                    input: Box::new(input.deep_clone(seen_tees)),
2907                    metadata: metadata.clone(),
2908                }
2909            }
2910            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2911                f: f.deep_clone(seen_tees),
2912                input: Box::new(input.deep_clone(seen_tees)),
2913                metadata: metadata.clone(),
2914            },
2915            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2916                f: f.deep_clone(seen_tees),
2917                input: Box::new(input.deep_clone(seen_tees)),
2918                metadata: metadata.clone(),
2919            },
2920            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2921                input: Box::new(input.deep_clone(seen_tees)),
2922                metadata: metadata.clone(),
2923            },
2924            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2925                input: Box::new(input.deep_clone(seen_tees)),
2926                metadata: metadata.clone(),
2927            },
2928            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2929                f: f.deep_clone(seen_tees),
2930                input: Box::new(input.deep_clone(seen_tees)),
2931                metadata: metadata.clone(),
2932            },
2933            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2934                input: Box::new(input.deep_clone(seen_tees)),
2935                metadata: metadata.clone(),
2936            },
2937            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2938                input: Box::new(input.deep_clone(seen_tees)),
2939                metadata: metadata.clone(),
2940            },
2941            HydroNode::Fold {
2942                init,
2943                acc,
2944                input,
2945                metadata,
2946            } => HydroNode::Fold {
2947                init: init.deep_clone(seen_tees),
2948                acc: acc.deep_clone(seen_tees),
2949                input: Box::new(input.deep_clone(seen_tees)),
2950                metadata: metadata.clone(),
2951            },
2952            HydroNode::Scan {
2953                init,
2954                acc,
2955                input,
2956                metadata,
2957            } => HydroNode::Scan {
2958                init: init.deep_clone(seen_tees),
2959                acc: acc.deep_clone(seen_tees),
2960                input: Box::new(input.deep_clone(seen_tees)),
2961                metadata: metadata.clone(),
2962            },
2963            HydroNode::ScanAsyncBlocking {
2964                init,
2965                acc,
2966                input,
2967                metadata,
2968            } => HydroNode::ScanAsyncBlocking {
2969                init: init.deep_clone(seen_tees),
2970                acc: acc.deep_clone(seen_tees),
2971                input: Box::new(input.deep_clone(seen_tees)),
2972                metadata: metadata.clone(),
2973            },
2974            HydroNode::FoldKeyed {
2975                init,
2976                acc,
2977                input,
2978                metadata,
2979            } => HydroNode::FoldKeyed {
2980                init: init.deep_clone(seen_tees),
2981                acc: acc.deep_clone(seen_tees),
2982                input: Box::new(input.deep_clone(seen_tees)),
2983                metadata: metadata.clone(),
2984            },
2985            HydroNode::ReduceKeyedWatermark {
2986                f,
2987                input,
2988                watermark,
2989                metadata,
2990            } => HydroNode::ReduceKeyedWatermark {
2991                f: f.deep_clone(seen_tees),
2992                input: Box::new(input.deep_clone(seen_tees)),
2993                watermark: Box::new(watermark.deep_clone(seen_tees)),
2994                metadata: metadata.clone(),
2995            },
2996            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2997                f: f.deep_clone(seen_tees),
2998                input: Box::new(input.deep_clone(seen_tees)),
2999                metadata: metadata.clone(),
3000            },
3001            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3002                f: f.deep_clone(seen_tees),
3003                input: Box::new(input.deep_clone(seen_tees)),
3004                metadata: metadata.clone(),
3005            },
3006            HydroNode::Network {
3007                name,
3008                networking_info,
3009                serialize_fn,
3010                instantiate_fn,
3011                deserialize_fn,
3012                input,
3013                metadata,
3014            } => HydroNode::Network {
3015                name: name.clone(),
3016                networking_info: networking_info.clone(),
3017                serialize_fn: serialize_fn.clone(),
3018                instantiate_fn: instantiate_fn.clone(),
3019                deserialize_fn: deserialize_fn.clone(),
3020                input: Box::new(input.deep_clone(seen_tees)),
3021                metadata: metadata.clone(),
3022            },
3023            HydroNode::ExternalInput {
3024                from_external_key,
3025                from_port_id,
3026                from_many,
3027                codec_type,
3028                port_hint,
3029                instantiate_fn,
3030                deserialize_fn,
3031                metadata,
3032            } => HydroNode::ExternalInput {
3033                from_external_key: *from_external_key,
3034                from_port_id: *from_port_id,
3035                from_many: *from_many,
3036                codec_type: codec_type.clone(),
3037                port_hint: *port_hint,
3038                instantiate_fn: instantiate_fn.clone(),
3039                deserialize_fn: deserialize_fn.clone(),
3040                metadata: metadata.clone(),
3041            },
3042            HydroNode::Counter {
3043                tag,
3044                duration,
3045                prefix,
3046                input,
3047                metadata,
3048            } => HydroNode::Counter {
3049                tag: tag.clone(),
3050                duration: duration.clone(),
3051                prefix: prefix.clone(),
3052                input: Box::new(input.deep_clone(seen_tees)),
3053                metadata: metadata.clone(),
3054            },
3055        }
3056    }
3057
3058    #[cfg(feature = "build")]
3059    pub fn emit_core(
3060        &mut self,
3061        builders_or_callback: &mut BuildersOrCallback<
3062            impl FnMut(&mut HydroRoot, &mut StmtId),
3063            impl FnMut(&mut HydroNode, &mut StmtId),
3064        >,
3065        seen_tees: &mut SeenSharedNodes,
3066        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3067        next_stmt_id: &mut StmtId,
3068        fold_hooked_idents: &mut HashSet<String>,
3069    ) -> syn::Ident {
3070        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3071        let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> = HashMap::new();
3072
3073        self.transform_bottom_up(
3074            &mut |node: &mut HydroNode| {
3075                let out_location = node.metadata().location_id.clone();
3076                match node {
3077                    HydroNode::Placeholder => {
3078                        panic!()
3079                    }
3080
3081                    HydroNode::Cast { .. } => {
3082                        // Cast passes through the input ident unchanged
3083                        // The input ident is already on the stack from processing the child
3084                        match builders_or_callback {
3085                            BuildersOrCallback::Builders(_) => {}
3086                            BuildersOrCallback::Callback(_, node_callback) => {
3087                                node_callback(node, next_stmt_id);
3088                            }
3089                        }
3090
3091                        let _ = next_stmt_id.get_and_increment();
3092                        // input_ident stays on stack as output
3093                    }
3094
3095                    HydroNode::UnboundSingleton { .. } => {
3096                        let inner_ident = ident_stack.pop().unwrap();
3097
3098                        let out_ident =
3099                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3100
3101                        match builders_or_callback {
3102                            BuildersOrCallback::Builders(graph_builders) => {
3103                                if graph_builders.singleton_intermediates() {
3104                                    let builder = graph_builders.get_dfir_mut(&out_location);
3105                                    builder.add_dfir(
3106                                        parse_quote! {
3107                                            #out_ident = #inner_ident;
3108                                        },
3109                                        None,
3110                                        None,
3111                                    );
3112                                } else {
3113                                    let builder = graph_builders.get_dfir_mut(&out_location);
3114                                    builder.add_dfir(
3115                                        parse_quote! {
3116                                            #out_ident = #inner_ident -> persist::<'static>();
3117                                        },
3118                                        None,
3119                                        None,
3120                                    );
3121                                }
3122                            }
3123                            BuildersOrCallback::Callback(_, node_callback) => {
3124                                node_callback(node, next_stmt_id);
3125                            }
3126                        }
3127
3128                        let _ = next_stmt_id.get_and_increment();
3129
3130                        ident_stack.push(out_ident);
3131                    }
3132
3133                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3134                        let inner_ident = ident_stack.pop().unwrap();
3135
3136                        let out_ident =
3137                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3138
3139                        match builders_or_callback {
3140                            BuildersOrCallback::Builders(graph_builders) => {
3141                                graph_builders.assert_is_consistent(
3142                                    *trusted,
3143                                    &inner.metadata().location_id,
3144                                    inner_ident,
3145                                    &out_ident,
3146                                );
3147                            }
3148                            BuildersOrCallback::Callback(_, node_callback) => {
3149                                node_callback(node, next_stmt_id);
3150                            }
3151                        }
3152
3153                        let _ = next_stmt_id.get_and_increment();
3154
3155                        ident_stack.push(out_ident);
3156                    }
3157
3158                    HydroNode::ObserveNonDet {
3159                        inner,
3160                        trusted,
3161                        metadata,
3162                        ..
3163                    } => {
3164                        let inner_ident = ident_stack.pop().unwrap();
3165
3166                        let observe_ident =
3167                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3168
3169                        match builders_or_callback {
3170                            BuildersOrCallback::Builders(graph_builders) => {
3171                                graph_builders.observe_nondet(
3172                                    *trusted,
3173                                    &inner.metadata().location_id,
3174                                    inner_ident,
3175                                    &inner.metadata().collection_kind,
3176                                    &observe_ident,
3177                                    &metadata.collection_kind,
3178                                    &metadata.op,
3179                                );
3180                            }
3181                            BuildersOrCallback::Callback(_, node_callback) => {
3182                                node_callback(node, next_stmt_id);
3183                            }
3184                        }
3185
3186                        let _ = next_stmt_id.get_and_increment();
3187
3188                        ident_stack.push(observe_ident);
3189                    }
3190
3191                    HydroNode::Batch {
3192                        inner, metadata, ..
3193                    } => {
3194                        let inner_ident = ident_stack.pop().unwrap();
3195
3196                        let batch_ident =
3197                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3198
3199                        match builders_or_callback {
3200                            BuildersOrCallback::Builders(graph_builders) => {
3201                                graph_builders.batch(
3202                                    inner_ident,
3203                                    &inner.metadata().location_id,
3204                                    &inner.metadata().collection_kind,
3205                                    &batch_ident,
3206                                    &out_location,
3207                                    &metadata.op,
3208                                    fold_hooked_idents,
3209                                );
3210                            }
3211                            BuildersOrCallback::Callback(_, node_callback) => {
3212                                node_callback(node, next_stmt_id);
3213                            }
3214                        }
3215
3216                        let _ = next_stmt_id.get_and_increment();
3217
3218                        ident_stack.push(batch_ident);
3219                    }
3220
3221                    HydroNode::YieldConcat { inner, .. } => {
3222                        let inner_ident = ident_stack.pop().unwrap();
3223
3224                        let yield_ident =
3225                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3226
3227                        match builders_or_callback {
3228                            BuildersOrCallback::Builders(graph_builders) => {
3229                                graph_builders.yield_from_tick(
3230                                    inner_ident,
3231                                    &inner.metadata().location_id,
3232                                    &inner.metadata().collection_kind,
3233                                    &yield_ident,
3234                                    &out_location,
3235                                );
3236                            }
3237                            BuildersOrCallback::Callback(_, node_callback) => {
3238                                node_callback(node, next_stmt_id);
3239                            }
3240                        }
3241
3242                        let _ = next_stmt_id.get_and_increment();
3243
3244                        ident_stack.push(yield_ident);
3245                    }
3246
3247                    HydroNode::BeginAtomic { inner, metadata } => {
3248                        let inner_ident = ident_stack.pop().unwrap();
3249
3250                        let begin_ident =
3251                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3252
3253                        match builders_or_callback {
3254                            BuildersOrCallback::Builders(graph_builders) => {
3255                                graph_builders.begin_atomic(
3256                                    inner_ident,
3257                                    &inner.metadata().location_id,
3258                                    &inner.metadata().collection_kind,
3259                                    &begin_ident,
3260                                    &out_location,
3261                                    &metadata.op,
3262                                );
3263                            }
3264                            BuildersOrCallback::Callback(_, node_callback) => {
3265                                node_callback(node, next_stmt_id);
3266                            }
3267                        }
3268
3269                        let _ = next_stmt_id.get_and_increment();
3270
3271                        ident_stack.push(begin_ident);
3272                    }
3273
3274                    HydroNode::EndAtomic { inner, .. } => {
3275                        let inner_ident = ident_stack.pop().unwrap();
3276
3277                        let end_ident =
3278                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3279
3280                        match builders_or_callback {
3281                            BuildersOrCallback::Builders(graph_builders) => {
3282                                graph_builders.end_atomic(
3283                                    inner_ident,
3284                                    &inner.metadata().location_id,
3285                                    &inner.metadata().collection_kind,
3286                                    &end_ident,
3287                                );
3288                            }
3289                            BuildersOrCallback::Callback(_, node_callback) => {
3290                                node_callback(node, next_stmt_id);
3291                            }
3292                        }
3293
3294                        let _ = next_stmt_id.get_and_increment();
3295
3296                        ident_stack.push(end_ident);
3297                    }
3298
3299                    HydroNode::Source {
3300                        source, metadata, ..
3301                    } => {
3302                        if let HydroSource::ExternalNetwork() = source {
3303                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3304                        } else {
3305                            let source_ident =
3306                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3307
3308                            let source_stmt = match source {
3309                                HydroSource::Stream(expr) => {
3310                                    debug_assert!(metadata.location_id.is_top_level());
3311                                    parse_quote! {
3312                                        #source_ident = source_stream(#expr);
3313                                    }
3314                                }
3315
3316                                HydroSource::ExternalNetwork() => {
3317                                    unreachable!()
3318                                }
3319
3320                                HydroSource::Iter(expr) => {
3321                                    if metadata.location_id.is_top_level() {
3322                                        parse_quote! {
3323                                            #source_ident = source_iter(#expr);
3324                                        }
3325                                    } else {
3326                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3327                                        parse_quote! {
3328                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3329                                        }
3330                                    }
3331                                }
3332
3333                                HydroSource::Spin() => {
3334                                    debug_assert!(metadata.location_id.is_top_level());
3335                                    parse_quote! {
3336                                        #source_ident = spin();
3337                                    }
3338                                }
3339
3340                                HydroSource::ClusterMembers(target_loc, state) => {
3341                                    debug_assert!(metadata.location_id.is_top_level());
3342
3343                                    let members_tee_ident = syn::Ident::new(
3344                                        &format!(
3345                                            "__cluster_members_tee_{}_{}",
3346                                            metadata.location_id.root().key(),
3347                                            target_loc.key(),
3348                                        ),
3349                                        Span::call_site(),
3350                                    );
3351
3352                                    match state {
3353                                        ClusterMembersState::Stream(d) => {
3354                                            parse_quote! {
3355                                                #members_tee_ident = source_stream(#d) -> tee();
3356                                                #source_ident = #members_tee_ident;
3357                                            }
3358                                        },
3359                                        ClusterMembersState::Uninit => syn::parse_quote! {
3360                                            #source_ident = source_stream(DUMMY);
3361                                        },
3362                                        ClusterMembersState::Tee(..) => parse_quote! {
3363                                            #source_ident = #members_tee_ident;
3364                                        },
3365                                    }
3366                                }
3367
3368                                HydroSource::Embedded(ident) => {
3369                                    parse_quote! {
3370                                        #source_ident = source_stream(#ident);
3371                                    }
3372                                }
3373
3374                                HydroSource::EmbeddedSingleton(ident) => {
3375                                    parse_quote! {
3376                                        #source_ident = source_iter([#ident]);
3377                                    }
3378                                }
3379                            };
3380
3381                            match builders_or_callback {
3382                                BuildersOrCallback::Builders(graph_builders) => {
3383                                    let builder = graph_builders.get_dfir_mut(&out_location);
3384                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3385                                }
3386                                BuildersOrCallback::Callback(_, node_callback) => {
3387                                    node_callback(node, next_stmt_id);
3388                                }
3389                            }
3390
3391                            let _ = next_stmt_id.get_and_increment();
3392
3393                            ident_stack.push(source_ident);
3394                        }
3395                    }
3396
3397                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3398                        let source_ident =
3399                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3400
3401                        match builders_or_callback {
3402                            BuildersOrCallback::Builders(graph_builders) => {
3403                                let builder = graph_builders.get_dfir_mut(&out_location);
3404
3405                                if *first_tick_only {
3406                                    assert!(
3407                                        !metadata.location_id.is_top_level(),
3408                                        "first_tick_only SingletonSource must be inside a tick"
3409                                    );
3410                                }
3411
3412                                if *first_tick_only
3413                                    || (metadata.location_id.is_top_level()
3414                                        && metadata.collection_kind.is_bounded())
3415                                {
3416                                    builder.add_dfir(
3417                                        parse_quote! {
3418                                            #source_ident = source_iter([#value]);
3419                                        },
3420                                        None,
3421                                        Some(&next_stmt_id.to_string()),
3422                                    );
3423                                } else {
3424                                    builder.add_dfir(
3425                                        parse_quote! {
3426                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3427                                        },
3428                                        None,
3429                                        Some(&next_stmt_id.to_string()),
3430                                    );
3431                                }
3432                            }
3433                            BuildersOrCallback::Callback(_, node_callback) => {
3434                                node_callback(node, next_stmt_id);
3435                            }
3436                        }
3437
3438                        let _ = next_stmt_id.get_and_increment();
3439
3440                        ident_stack.push(source_ident);
3441                    }
3442
3443                    HydroNode::CycleSource { cycle_id, .. } => {
3444                        let ident = cycle_id.as_ident();
3445
3446                        match builders_or_callback {
3447                            BuildersOrCallback::Builders(_) => {}
3448                            BuildersOrCallback::Callback(_, node_callback) => {
3449                                node_callback(node, next_stmt_id);
3450                            }
3451                        }
3452
3453                        // consume a stmt id even though we did not emit anything so that we can instrument this
3454                        let _ = next_stmt_id.get_and_increment();
3455
3456                        ident_stack.push(ident);
3457                    }
3458
3459                    HydroNode::Tee { inner, .. } => {
3460                        let ret_ident = if let Some(built_idents) =
3461                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3462                        {
3463                            match builders_or_callback {
3464                                BuildersOrCallback::Builders(_) => {}
3465                                BuildersOrCallback::Callback(_, node_callback) => {
3466                                    node_callback(node, next_stmt_id);
3467                                }
3468                            }
3469
3470                            built_idents[0].clone()
3471                        } else {
3472                            // The inner node was already processed by transform_bottom_up,
3473                            // so its ident is on the stack
3474                            let inner_ident = ident_stack.pop().unwrap();
3475
3476                            let tee_ident =
3477                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479                            built_tees.insert(
3480                                inner.0.as_ref() as *const RefCell<HydroNode>,
3481                                vec![tee_ident.clone()],
3482                            );
3483
3484                            match builders_or_callback {
3485                                BuildersOrCallback::Builders(graph_builders) => {
3486                                    // NOTE: With `forward_ref`, the fold codegen may not have
3487                                    // run yet when we reach this tee, so `fold_hooked_idents`
3488                                    // might not contain the inner ident. In that case we won't
3489                                    // propagate the "hooked" status to the tee and the
3490                                    // downstream singleton batch will use the normal
3491                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3492                                    // This is not a soundness issue: the fallback hook still
3493                                    // produces correct behavior, just with a redundant decision
3494                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3495                                    // fix ordering so forward_ref folds are always processed
3496                                    // before their downstream tees.
3497                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3498                                        fold_hooked_idents.insert(tee_ident.to_string());
3499                                    }
3500                                    let builder = graph_builders.get_dfir_mut(&out_location);
3501                                    builder.add_dfir(
3502                                        parse_quote! {
3503                                            #tee_ident = #inner_ident -> tee();
3504                                        },
3505                                        None,
3506                                        Some(&next_stmt_id.to_string()),
3507                                    );
3508                                }
3509                                BuildersOrCallback::Callback(_, node_callback) => {
3510                                    node_callback(node, next_stmt_id);
3511                                }
3512                            }
3513
3514                            tee_ident
3515                        };
3516
3517                        // we consume a stmt id regardless of if we emit the tee() operator,
3518                        // so that during rewrites we touch all recipients of the tee()
3519
3520                        let _ = next_stmt_id.get_and_increment();
3521                        ident_stack.push(ret_ident);
3522                    }
3523
3524                    HydroNode::Reference { inner, kind, .. } => {
3525                        let ret_ident = if let Some(built_idents) =
3526                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3527                        {
3528                            built_idents[0].clone()
3529                        } else {
3530                            let inner_ident = ident_stack.pop().unwrap();
3531
3532                            let ref_ident =
3533                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3534
3535                            built_tees.insert(
3536                                inner.0.as_ref() as *const RefCell<HydroNode>,
3537                                vec![ref_ident.clone()],
3538                            );
3539
3540                            match builders_or_callback {
3541                                BuildersOrCallback::Builders(graph_builders) => {
3542                                    let builder = graph_builders.get_dfir_mut(&out_location);
3543                                    let op_ident = syn::Ident::new(
3544                                        match kind {
3545                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3546                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3547                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3548                                        },
3549                                        Span::call_site(),
3550                                    );
3551                                    builder.add_dfir(
3552                                        parse_quote! {
3553                                            #ref_ident = #inner_ident -> #op_ident();
3554                                        },
3555                                        None,
3556                                        Some(&next_stmt_id.to_string()),
3557                                    );
3558                                }
3559                                BuildersOrCallback::Callback(_, node_callback) => {
3560                                    node_callback(node, next_stmt_id);
3561                                }
3562                            }
3563
3564                            ref_ident
3565                        };
3566
3567                        // we consume a stmt id regardless of if we emit the operator,
3568                        // so that during rewrites we touch all recipients
3569                        let _ = next_stmt_id.get_and_increment();
3570                        ident_stack.push(ret_ident);
3571                    }
3572
3573                    HydroNode::Partition {
3574                        inner, f, is_true, ..
3575                    } => {
3576                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3577                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3578                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3579                            match builders_or_callback {
3580                                BuildersOrCallback::Builders(_) => {}
3581                                BuildersOrCallback::Callback(_, node_callback) => {
3582                                    node_callback(node, next_stmt_id);
3583                                }
3584                            }
3585
3586                            let idx = if is_true { 0 } else { 1 };
3587                            built_idents[idx].clone()
3588                        } else {
3589                            // The inner node was already processed by transform_bottom_up,
3590                            // so its ident is on the stack
3591                            let inner_ident = ident_stack.pop().unwrap();
3592                            let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
3593
3594                            let partition_ident = syn::Ident::new(
3595                                &format!("stream_{}_partition", *next_stmt_id),
3596                                Span::call_site(),
3597                            );
3598                            let true_ident = syn::Ident::new(
3599                                &format!("stream_{}_true", *next_stmt_id),
3600                                Span::call_site(),
3601                            );
3602                            let false_ident = syn::Ident::new(
3603                                &format!("stream_{}_false", *next_stmt_id),
3604                                Span::call_site(),
3605                            );
3606
3607                            built_tees.insert(
3608                                ptr,
3609                                vec![true_ident.clone(), false_ident.clone()],
3610                            );
3611
3612                            match builders_or_callback {
3613                                BuildersOrCallback::Builders(graph_builders) => {
3614                                    let builder = graph_builders.get_dfir_mut(&out_location);
3615                                    builder.add_dfir(
3616                                        parse_quote! {
3617                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3618                                            #true_ident = #partition_ident[0];
3619                                            #false_ident = #partition_ident[1];
3620                                        },
3621                                        None,
3622                                        Some(&next_stmt_id.to_string()),
3623                                    );
3624                                }
3625                                BuildersOrCallback::Callback(_, node_callback) => {
3626                                    node_callback(node, next_stmt_id);
3627                                }
3628                            }
3629
3630                            if is_true { true_ident } else { false_ident }
3631                        };
3632
3633                        let _ = next_stmt_id.get_and_increment();
3634                        ident_stack.push(ret_ident);
3635                    }
3636
3637                    HydroNode::Chain { .. } => {
3638                        // Children are processed left-to-right, so second is on top
3639                        let second_ident = ident_stack.pop().unwrap();
3640                        let first_ident = ident_stack.pop().unwrap();
3641
3642                        let chain_ident =
3643                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3644
3645                        match builders_or_callback {
3646                            BuildersOrCallback::Builders(graph_builders) => {
3647                                let builder = graph_builders.get_dfir_mut(&out_location);
3648                                builder.add_dfir(
3649                                    parse_quote! {
3650                                        #chain_ident = chain();
3651                                        #first_ident -> [0]#chain_ident;
3652                                        #second_ident -> [1]#chain_ident;
3653                                    },
3654                                    None,
3655                                    Some(&next_stmt_id.to_string()),
3656                                );
3657                            }
3658                            BuildersOrCallback::Callback(_, node_callback) => {
3659                                node_callback(node, next_stmt_id);
3660                            }
3661                        }
3662
3663                        let _ = next_stmt_id.get_and_increment();
3664
3665                        ident_stack.push(chain_ident);
3666                    }
3667
3668                    HydroNode::MergeOrdered { first, metadata, .. } => {
3669                        let second_ident = ident_stack.pop().unwrap();
3670                        let first_ident = ident_stack.pop().unwrap();
3671
3672                        let merge_ident =
3673                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3674
3675                        match builders_or_callback {
3676                            BuildersOrCallback::Builders(graph_builders) => {
3677                                graph_builders.merge_ordered(
3678                                    &first.metadata().location_id,
3679                                    first_ident,
3680                                    second_ident,
3681                                    &merge_ident,
3682                                    &first.metadata().collection_kind,
3683                                    &metadata.op,
3684                                    Some(&next_stmt_id.to_string()),
3685                                );
3686                            }
3687                            BuildersOrCallback::Callback(_, node_callback) => {
3688                                node_callback(node, next_stmt_id);
3689                            }
3690                        }
3691
3692                        let _ = next_stmt_id.get_and_increment();
3693
3694                        ident_stack.push(merge_ident);
3695                    }
3696
3697                    HydroNode::ChainFirst { .. } => {
3698                        let second_ident = ident_stack.pop().unwrap();
3699                        let first_ident = ident_stack.pop().unwrap();
3700
3701                        let chain_ident =
3702                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3703
3704                        match builders_or_callback {
3705                            BuildersOrCallback::Builders(graph_builders) => {
3706                                let builder = graph_builders.get_dfir_mut(&out_location);
3707                                builder.add_dfir(
3708                                    parse_quote! {
3709                                        #chain_ident = chain_first_n(1);
3710                                        #first_ident -> [0]#chain_ident;
3711                                        #second_ident -> [1]#chain_ident;
3712                                    },
3713                                    None,
3714                                    Some(&next_stmt_id.to_string()),
3715                                );
3716                            }
3717                            BuildersOrCallback::Callback(_, node_callback) => {
3718                                node_callback(node, next_stmt_id);
3719                            }
3720                        }
3721
3722                        let _ = next_stmt_id.get_and_increment();
3723
3724                        ident_stack.push(chain_ident);
3725                    }
3726
3727                    HydroNode::CrossSingleton { right, .. } => {
3728                        let right_ident = ident_stack.pop().unwrap();
3729                        let left_ident = ident_stack.pop().unwrap();
3730
3731                        let cross_ident =
3732                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3733
3734                        match builders_or_callback {
3735                            BuildersOrCallback::Builders(graph_builders) => {
3736                                let builder = graph_builders.get_dfir_mut(&out_location);
3737
3738                                if right.metadata().location_id.is_top_level()
3739                                    && right.metadata().collection_kind.is_bounded()
3740                                {
3741                                    builder.add_dfir(
3742                                        parse_quote! {
3743                                            #cross_ident = cross_singleton::<'static>();
3744                                            #left_ident -> [input]#cross_ident;
3745                                            #right_ident -> [single]#cross_ident;
3746                                        },
3747                                        None,
3748                                        Some(&next_stmt_id.to_string()),
3749                                    );
3750                                } else {
3751                                    builder.add_dfir(
3752                                        parse_quote! {
3753                                            #cross_ident = cross_singleton();
3754                                            #left_ident -> [input]#cross_ident;
3755                                            #right_ident -> [single]#cross_ident;
3756                                        },
3757                                        None,
3758                                        Some(&next_stmt_id.to_string()),
3759                                    );
3760                                }
3761                            }
3762                            BuildersOrCallback::Callback(_, node_callback) => {
3763                                node_callback(node, next_stmt_id);
3764                            }
3765                        }
3766
3767                        let _ = next_stmt_id.get_and_increment();
3768
3769                        ident_stack.push(cross_ident);
3770                    }
3771
3772                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3773                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3774                            parse_quote!(cross_join_multiset)
3775                        } else {
3776                            parse_quote!(join_multiset)
3777                        };
3778
3779                        let (HydroNode::CrossProduct { left, right, .. }
3780                        | HydroNode::Join { left, right, .. }) = node
3781                        else {
3782                            unreachable!()
3783                        };
3784
3785                        let is_top_level = left.metadata().location_id.is_top_level()
3786                            && right.metadata().location_id.is_top_level();
3787                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3788                            quote!('static)
3789                        } else {
3790                            quote!('tick)
3791                        };
3792
3793                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3794                            quote!('static)
3795                        } else {
3796                            quote!('tick)
3797                        };
3798
3799                        let right_ident = ident_stack.pop().unwrap();
3800                        let left_ident = ident_stack.pop().unwrap();
3801
3802                        let stream_ident =
3803                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3804
3805                        match builders_or_callback {
3806                            BuildersOrCallback::Builders(graph_builders) => {
3807                                let builder = graph_builders.get_dfir_mut(&out_location);
3808                                builder.add_dfir(
3809                                    if is_top_level {
3810                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3811                                        // a multiset_delta() to negate the replay behavior
3812                                        parse_quote! {
3813                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3814                                            #left_ident -> [0]#stream_ident;
3815                                            #right_ident -> [1]#stream_ident;
3816                                        }
3817                                    } else {
3818                                        parse_quote! {
3819                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3820                                            #left_ident -> [0]#stream_ident;
3821                                            #right_ident -> [1]#stream_ident;
3822                                        }
3823                                    }
3824                                    ,
3825                                    None,
3826                                    Some(&next_stmt_id.to_string()),
3827                                );
3828                            }
3829                            BuildersOrCallback::Callback(_, node_callback) => {
3830                                node_callback(node, next_stmt_id);
3831                            }
3832                        }
3833
3834                        let _ = next_stmt_id.get_and_increment();
3835
3836                        ident_stack.push(stream_ident);
3837                    }
3838
3839                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3840                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3841                            parse_quote!(difference)
3842                        } else {
3843                            parse_quote!(anti_join)
3844                        };
3845
3846                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3847                            node
3848                        else {
3849                            unreachable!()
3850                        };
3851
3852                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3853                            quote!('static)
3854                        } else {
3855                            quote!('tick)
3856                        };
3857
3858                        let neg_ident = ident_stack.pop().unwrap();
3859                        let pos_ident = ident_stack.pop().unwrap();
3860
3861                        let stream_ident =
3862                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3863
3864                        match builders_or_callback {
3865                            BuildersOrCallback::Builders(graph_builders) => {
3866                                let builder = graph_builders.get_dfir_mut(&out_location);
3867                                builder.add_dfir(
3868                                    parse_quote! {
3869                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3870                                        #pos_ident -> [pos]#stream_ident;
3871                                        #neg_ident -> [neg]#stream_ident;
3872                                    },
3873                                    None,
3874                                    Some(&next_stmt_id.to_string()),
3875                                );
3876                            }
3877                            BuildersOrCallback::Callback(_, node_callback) => {
3878                                node_callback(node, next_stmt_id);
3879                            }
3880                        }
3881
3882                        let _ = next_stmt_id.get_and_increment();
3883
3884                        ident_stack.push(stream_ident);
3885                    }
3886
3887                    HydroNode::JoinHalf { .. } => {
3888                        let HydroNode::JoinHalf { right, .. } = node else {
3889                            unreachable!()
3890                        };
3891
3892                        assert!(
3893                            right.metadata().collection_kind.is_bounded(),
3894                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3895                            right.metadata().collection_kind
3896                        );
3897
3898                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3899                            quote!('static)
3900                        } else {
3901                            quote!('tick)
3902                        };
3903
3904                        let build_ident = ident_stack.pop().unwrap();
3905                        let probe_ident = ident_stack.pop().unwrap();
3906
3907                        let stream_ident =
3908                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3909
3910                        match builders_or_callback {
3911                            BuildersOrCallback::Builders(graph_builders) => {
3912                                let builder = graph_builders.get_dfir_mut(&out_location);
3913                                builder.add_dfir(
3914                                    parse_quote! {
3915                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3916                                        #probe_ident -> [probe]#stream_ident;
3917                                        #build_ident -> [build]#stream_ident;
3918                                    },
3919                                    None,
3920                                    Some(&next_stmt_id.to_string()),
3921                                );
3922                            }
3923                            BuildersOrCallback::Callback(_, node_callback) => {
3924                                node_callback(node, next_stmt_id);
3925                            }
3926                        }
3927
3928                        let _ = next_stmt_id.get_and_increment();
3929
3930                        ident_stack.push(stream_ident);
3931                    }
3932
3933                    HydroNode::ResolveFutures { .. } => {
3934                        let input_ident = ident_stack.pop().unwrap();
3935
3936                        let futures_ident =
3937                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3938
3939                        match builders_or_callback {
3940                            BuildersOrCallback::Builders(graph_builders) => {
3941                                let builder = graph_builders.get_dfir_mut(&out_location);
3942                                builder.add_dfir(
3943                                    parse_quote! {
3944                                        #futures_ident = #input_ident -> resolve_futures();
3945                                    },
3946                                    None,
3947                                    Some(&next_stmt_id.to_string()),
3948                                );
3949                            }
3950                            BuildersOrCallback::Callback(_, node_callback) => {
3951                                node_callback(node, next_stmt_id);
3952                            }
3953                        }
3954
3955                        let _ = next_stmt_id.get_and_increment();
3956
3957                        ident_stack.push(futures_ident);
3958                    }
3959
3960                    HydroNode::ResolveFuturesBlocking { .. } => {
3961                        let input_ident = ident_stack.pop().unwrap();
3962
3963                        let futures_ident =
3964                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3965
3966                        match builders_or_callback {
3967                            BuildersOrCallback::Builders(graph_builders) => {
3968                                let builder = graph_builders.get_dfir_mut(&out_location);
3969                                builder.add_dfir(
3970                                    parse_quote! {
3971                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3972                                    },
3973                                    None,
3974                                    Some(&next_stmt_id.to_string()),
3975                                );
3976                            }
3977                            BuildersOrCallback::Callback(_, node_callback) => {
3978                                node_callback(node, next_stmt_id);
3979                            }
3980                        }
3981
3982                        let _ = next_stmt_id.get_and_increment();
3983
3984                        ident_stack.push(futures_ident);
3985                    }
3986
3987                    HydroNode::ResolveFuturesOrdered { .. } => {
3988                        let input_ident = ident_stack.pop().unwrap();
3989
3990                        let futures_ident =
3991                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3992
3993                        match builders_or_callback {
3994                            BuildersOrCallback::Builders(graph_builders) => {
3995                                let builder = graph_builders.get_dfir_mut(&out_location);
3996                                builder.add_dfir(
3997                                    parse_quote! {
3998                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3999                                    },
4000                                    None,
4001                                    Some(&next_stmt_id.to_string()),
4002                                );
4003                            }
4004                            BuildersOrCallback::Callback(_, node_callback) => {
4005                                node_callback(node, next_stmt_id);
4006                            }
4007                        }
4008
4009                        let _ = next_stmt_id.get_and_increment();
4010
4011                        ident_stack.push(futures_ident);
4012                    }
4013
4014                    HydroNode::Map { f, .. } => {
4015                        // Pop input ident (pushed last by transform_children).
4016                        let input_ident = ident_stack.pop().unwrap();
4017                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4018
4019                        let map_ident =
4020                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4021
4022                        match builders_or_callback {
4023                            BuildersOrCallback::Builders(graph_builders) => {
4024                                let builder = graph_builders.get_dfir_mut(&out_location);
4025                                builder.add_dfir(
4026                                    parse_quote! {
4027                                        #map_ident = #input_ident -> map(#f_tokens);
4028                                    },
4029                                    None,
4030                                    Some(&next_stmt_id.to_string()),
4031                                );
4032                            }
4033                            BuildersOrCallback::Callback(_, node_callback) => {
4034                                node_callback(node, next_stmt_id);
4035                            }
4036                        }
4037
4038                        let _ = next_stmt_id.get_and_increment();
4039
4040                        ident_stack.push(map_ident);
4041                    }
4042
4043                    HydroNode::FlatMap { f, .. } => {
4044                        let input_ident = ident_stack.pop().unwrap();
4045                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4046
4047                        let flat_map_ident =
4048                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4049
4050                        match builders_or_callback {
4051                            BuildersOrCallback::Builders(graph_builders) => {
4052                                let builder = graph_builders.get_dfir_mut(&out_location);
4053                                builder.add_dfir(
4054                                    parse_quote! {
4055                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4056                                    },
4057                                    None,
4058                                    Some(&next_stmt_id.to_string()),
4059                                );
4060                            }
4061                            BuildersOrCallback::Callback(_, node_callback) => {
4062                                node_callback(node, next_stmt_id);
4063                            }
4064                        }
4065
4066                        let _ = next_stmt_id.get_and_increment();
4067
4068                        ident_stack.push(flat_map_ident);
4069                    }
4070
4071                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4072                        let input_ident = ident_stack.pop().unwrap();
4073                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4074
4075                        let flat_map_stream_blocking_ident =
4076                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4077
4078                        match builders_or_callback {
4079                            BuildersOrCallback::Builders(graph_builders) => {
4080                                let builder = graph_builders.get_dfir_mut(&out_location);
4081                                builder.add_dfir(
4082                                    parse_quote! {
4083                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4084                                    },
4085                                    None,
4086                                    Some(&next_stmt_id.to_string()),
4087                                );
4088                            }
4089                            BuildersOrCallback::Callback(_, node_callback) => {
4090                                node_callback(node, next_stmt_id);
4091                            }
4092                        }
4093
4094                        let _ = next_stmt_id.get_and_increment();
4095
4096                        ident_stack.push(flat_map_stream_blocking_ident);
4097                    }
4098
4099                    HydroNode::Filter { f, .. } => {
4100                        let input_ident = ident_stack.pop().unwrap();
4101                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4102
4103                        let filter_ident =
4104                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4105
4106                        match builders_or_callback {
4107                            BuildersOrCallback::Builders(graph_builders) => {
4108                                let builder = graph_builders.get_dfir_mut(&out_location);
4109                                builder.add_dfir(
4110                                    parse_quote! {
4111                                        #filter_ident = #input_ident -> filter(#f_tokens);
4112                                    },
4113                                    None,
4114                                    Some(&next_stmt_id.to_string()),
4115                                );
4116                            }
4117                            BuildersOrCallback::Callback(_, node_callback) => {
4118                                node_callback(node, next_stmt_id);
4119                            }
4120                        }
4121
4122                        let _ = next_stmt_id.get_and_increment();
4123
4124                        ident_stack.push(filter_ident);
4125                    }
4126
4127                    HydroNode::FilterMap { f, .. } => {
4128                        let input_ident = ident_stack.pop().unwrap();
4129                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4130
4131                        let filter_map_ident =
4132                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4133
4134                        match builders_or_callback {
4135                            BuildersOrCallback::Builders(graph_builders) => {
4136                                let builder = graph_builders.get_dfir_mut(&out_location);
4137                                builder.add_dfir(
4138                                    parse_quote! {
4139                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4140                                    },
4141                                    None,
4142                                    Some(&next_stmt_id.to_string()),
4143                                );
4144                            }
4145                            BuildersOrCallback::Callback(_, node_callback) => {
4146                                node_callback(node, next_stmt_id);
4147                            }
4148                        }
4149
4150                        let _ = next_stmt_id.get_and_increment();
4151
4152                        ident_stack.push(filter_map_ident);
4153                    }
4154
4155                    HydroNode::Sort { .. } => {
4156                        let input_ident = ident_stack.pop().unwrap();
4157
4158                        let sort_ident =
4159                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4160
4161                        match builders_or_callback {
4162                            BuildersOrCallback::Builders(graph_builders) => {
4163                                let builder = graph_builders.get_dfir_mut(&out_location);
4164                                builder.add_dfir(
4165                                    parse_quote! {
4166                                        #sort_ident = #input_ident -> sort();
4167                                    },
4168                                    None,
4169                                    Some(&next_stmt_id.to_string()),
4170                                );
4171                            }
4172                            BuildersOrCallback::Callback(_, node_callback) => {
4173                                node_callback(node, next_stmt_id);
4174                            }
4175                        }
4176
4177                        let _ = next_stmt_id.get_and_increment();
4178
4179                        ident_stack.push(sort_ident);
4180                    }
4181
4182                    HydroNode::DeferTick { .. } => {
4183                        let input_ident = ident_stack.pop().unwrap();
4184
4185                        let defer_tick_ident =
4186                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4187
4188                        match builders_or_callback {
4189                            BuildersOrCallback::Builders(graph_builders) => {
4190                                let builder = graph_builders.get_dfir_mut(&out_location);
4191                                builder.add_dfir(
4192                                    parse_quote! {
4193                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4194                                    },
4195                                    None,
4196                                    Some(&next_stmt_id.to_string()),
4197                                );
4198                            }
4199                            BuildersOrCallback::Callback(_, node_callback) => {
4200                                node_callback(node, next_stmt_id);
4201                            }
4202                        }
4203
4204                        let _ = next_stmt_id.get_and_increment();
4205
4206                        ident_stack.push(defer_tick_ident);
4207                    }
4208
4209                    HydroNode::Enumerate { input, .. } => {
4210                        let input_ident = ident_stack.pop().unwrap();
4211
4212                        let enumerate_ident =
4213                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4214
4215                        match builders_or_callback {
4216                            BuildersOrCallback::Builders(graph_builders) => {
4217                                let builder = graph_builders.get_dfir_mut(&out_location);
4218                                let lifetime = if input.metadata().location_id.is_top_level() {
4219                                    quote!('static)
4220                                } else {
4221                                    quote!('tick)
4222                                };
4223                                builder.add_dfir(
4224                                    parse_quote! {
4225                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4226                                    },
4227                                    None,
4228                                    Some(&next_stmt_id.to_string()),
4229                                );
4230                            }
4231                            BuildersOrCallback::Callback(_, node_callback) => {
4232                                node_callback(node, next_stmt_id);
4233                            }
4234                        }
4235
4236                        let _ = next_stmt_id.get_and_increment();
4237
4238                        ident_stack.push(enumerate_ident);
4239                    }
4240
4241                    HydroNode::Inspect { f, .. } => {
4242                        let input_ident = ident_stack.pop().unwrap();
4243                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4244
4245                        let inspect_ident =
4246                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4247
4248                        match builders_or_callback {
4249                            BuildersOrCallback::Builders(graph_builders) => {
4250                                let builder = graph_builders.get_dfir_mut(&out_location);
4251                                builder.add_dfir(
4252                                    parse_quote! {
4253                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4254                                    },
4255                                    None,
4256                                    Some(&next_stmt_id.to_string()),
4257                                );
4258                            }
4259                            BuildersOrCallback::Callback(_, node_callback) => {
4260                                node_callback(node, next_stmt_id);
4261                            }
4262                        }
4263
4264                        let _ = next_stmt_id.get_and_increment();
4265
4266                        ident_stack.push(inspect_ident);
4267                    }
4268
4269                    HydroNode::Unique { input, .. } => {
4270                        let input_ident = ident_stack.pop().unwrap();
4271
4272                        let unique_ident =
4273                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4274
4275                        match builders_or_callback {
4276                            BuildersOrCallback::Builders(graph_builders) => {
4277                                let builder = graph_builders.get_dfir_mut(&out_location);
4278                                let lifetime = if input.metadata().location_id.is_top_level() {
4279                                    quote!('static)
4280                                } else {
4281                                    quote!('tick)
4282                                };
4283
4284                                builder.add_dfir(
4285                                    parse_quote! {
4286                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4287                                    },
4288                                    None,
4289                                    Some(&next_stmt_id.to_string()),
4290                                );
4291                            }
4292                            BuildersOrCallback::Callback(_, node_callback) => {
4293                                node_callback(node, next_stmt_id);
4294                            }
4295                        }
4296
4297                        let _ = next_stmt_id.get_and_increment();
4298
4299                        ident_stack.push(unique_ident);
4300                    }
4301
4302                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4303                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4304                            if input.metadata().location_id.is_top_level()
4305                                && input.metadata().collection_kind.is_bounded()
4306                            {
4307                                parse_quote!(fold_no_replay)
4308                            } else {
4309                                parse_quote!(fold)
4310                            }
4311                        } else if matches!(node, HydroNode::Scan { .. }) {
4312                            parse_quote!(scan)
4313                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4314                            parse_quote!(scan_async_blocking)
4315                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4316                            if input.metadata().location_id.is_top_level()
4317                                && input.metadata().collection_kind.is_bounded()
4318                            {
4319                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4320                            } else {
4321                                parse_quote!(fold_keyed)
4322                            }
4323                        } else {
4324                            unreachable!()
4325                        };
4326
4327                        let (HydroNode::Fold { input, .. }
4328                        | HydroNode::FoldKeyed { input, .. }
4329                        | HydroNode::Scan { input, .. }
4330                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4331                        else {
4332                            unreachable!()
4333                        };
4334
4335                        let lifetime = if input.metadata().location_id.is_top_level() {
4336                            quote!('static)
4337                        } else {
4338                            quote!('tick)
4339                        };
4340
4341                        let input_ident = ident_stack.pop().unwrap();
4342
4343                        let (HydroNode::Fold { init, acc, .. }
4344                        | HydroNode::FoldKeyed { init, acc, .. }
4345                        | HydroNode::Scan { init, acc, .. }
4346                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4347                        else {
4348                            unreachable!()
4349                        };
4350
4351                        let acc_tokens = acc.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4352                        let init_tokens = init.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4353
4354                        let fold_ident =
4355                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4356
4357                        match builders_or_callback {
4358                            BuildersOrCallback::Builders(graph_builders) => {
4359                                if matches!(node, HydroNode::Fold { .. })
4360                                    && node.metadata().location_id.is_top_level()
4361                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4362                                    && graph_builders.singleton_intermediates()
4363                                    && !node.metadata().collection_kind.is_bounded()
4364                                {
4365                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4366                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4367                                        &input.metadata().location_id,
4368                                        &input_ident,
4369                                        &input.metadata().collection_kind,
4370                                        &node.metadata().op,
4371                                    );
4372
4373                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4374                                        let acc: syn::Expr = parse_quote!({
4375                                            let mut __inner = #acc_tokens;
4376                                            move |__state, __batch: Vec<_>| {
4377                                                if __batch.is_empty() {
4378                                                    return None;
4379                                                }
4380                                                for __value in __batch {
4381                                                    __inner(__state, __value);
4382                                                }
4383                                                Some(__state.clone())
4384                                            }
4385                                        });
4386                                        (hooked, acc)
4387                                    } else {
4388                                        let acc: syn::Expr = parse_quote!({
4389                                            let mut __inner = #acc_tokens;
4390                                            move |__state, __value| {
4391                                                __inner(__state, __value);
4392                                                Some(__state.clone())
4393                                            }
4394                                        });
4395                                        (&input_ident, acc)
4396                                    };
4397
4398                                    let builder = graph_builders.get_dfir_mut(&out_location);
4399                                    builder.add_dfir(
4400                                        parse_quote! {
4401                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4402                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4403                                            #fold_ident = chain();
4404                                        },
4405                                        None,
4406                                        Some(&next_stmt_id.to_string()),
4407                                    );
4408
4409                                    if hooked_input_ident.is_some() {
4410                                        fold_hooked_idents.insert(fold_ident.to_string());
4411                                    }
4412                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4413                                    && node.metadata().location_id.is_top_level()
4414                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4415                                    && graph_builders.singleton_intermediates()
4416                                    && !node.metadata().collection_kind.is_bounded()
4417                                {
4418                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4419                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4420                                        &input.metadata().location_id,
4421                                        &input_ident,
4422                                        &input.metadata().collection_kind,
4423                                        &node.metadata().op,
4424                                    );
4425                                    let builder = graph_builders.get_dfir_mut(&out_location);
4426
4427                                    let wrapped_acc: syn::Expr = parse_quote!({
4428                                        let mut __init = #init_tokens;
4429                                        let mut __inner = #acc_tokens;
4430                                        move |__state, __kv: (_, _)| {
4431                                            // TODO(shadaj): we can avoid the clone when the entry exists
4432                                            let __state = __state
4433                                                .entry(::std::clone::Clone::clone(&__kv.0))
4434                                                .or_insert_with(|| (__init)());
4435                                            __inner(__state, __kv.1);
4436                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4437                                        }
4438                                    });
4439
4440                                    if let Some(hooked_input_ident) = hooked_input_ident {
4441                                        builder.add_dfir(
4442                                            parse_quote! {
4443                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4444                                            },
4445                                            None,
4446                                            Some(&next_stmt_id.to_string()),
4447                                        );
4448
4449                                        fold_hooked_idents.insert(fold_ident.to_string());
4450                                    } else {
4451                                        builder.add_dfir(
4452                                            parse_quote! {
4453                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4454                                            },
4455                                            None,
4456                                            Some(&next_stmt_id.to_string()),
4457                                        );
4458                                    }
4459                                } else if (matches!(node, HydroNode::Fold { .. })
4460                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4461                                    && !node.metadata().location_id.is_top_level()
4462                                    && graph_builders.singleton_intermediates()
4463                                {
4464                                    let input_ref = match &*node {
4465                                        HydroNode::Fold { input, .. } => input,
4466                                        HydroNode::FoldKeyed { input, .. } => input,
4467                                        _ => unreachable!(),
4468                                    };
4469                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4470                                        &input_ref.metadata().location_id,
4471                                        &input_ident,
4472                                        &input_ref.metadata().collection_kind,
4473                                        &node.metadata().op,
4474                                    );
4475
4476                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4477                                    let builder = graph_builders.get_dfir_mut(&out_location);
4478                                    builder.add_dfir(
4479                                        parse_quote! {
4480                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4481                                        },
4482                                        None,
4483                                        Some(&next_stmt_id.to_string()),
4484                                    );
4485                                } else {
4486                                    let builder = graph_builders.get_dfir_mut(&out_location);
4487                                    builder.add_dfir(
4488                                        parse_quote! {
4489                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4490                                        },
4491                                        None,
4492                                        Some(&next_stmt_id.to_string()),
4493                                    );
4494                                }
4495                            }
4496                            BuildersOrCallback::Callback(_, node_callback) => {
4497                                node_callback(node, next_stmt_id);
4498                            }
4499                        }
4500
4501                        let _ = next_stmt_id.get_and_increment();
4502
4503                        ident_stack.push(fold_ident);
4504                    }
4505
4506                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4507                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4508                            if input.metadata().location_id.is_top_level()
4509                                && input.metadata().collection_kind.is_bounded()
4510                            {
4511                                parse_quote!(reduce_no_replay)
4512                            } else {
4513                                parse_quote!(reduce)
4514                            }
4515                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4516                            if input.metadata().location_id.is_top_level()
4517                                && input.metadata().collection_kind.is_bounded()
4518                            {
4519                                todo!(
4520                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4521                                )
4522                            } else {
4523                                parse_quote!(reduce_keyed)
4524                            }
4525                        } else {
4526                            unreachable!()
4527                        };
4528
4529                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4530                        else {
4531                            unreachable!()
4532                        };
4533
4534                        let lifetime = if input.metadata().location_id.is_top_level() {
4535                            quote!('static)
4536                        } else {
4537                            quote!('tick)
4538                        };
4539
4540                        let input_ident = ident_stack.pop().unwrap();
4541
4542                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4543                        else {
4544                            unreachable!()
4545                        };
4546
4547                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4548
4549                        let reduce_ident =
4550                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4551
4552                        match builders_or_callback {
4553                            BuildersOrCallback::Builders(graph_builders) => {
4554                                if matches!(node, HydroNode::Reduce { .. })
4555                                    && node.metadata().location_id.is_top_level()
4556                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4557                                    && graph_builders.singleton_intermediates()
4558                                    && !node.metadata().collection_kind.is_bounded()
4559                                {
4560                                    todo!(
4561                                        "Reduce with optional intermediates is not yet supported in simulator"
4562                                    );
4563                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4564                                    && node.metadata().location_id.is_top_level()
4565                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4566                                    && graph_builders.singleton_intermediates()
4567                                    && !node.metadata().collection_kind.is_bounded()
4568                                {
4569                                    todo!(
4570                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4571                                    );
4572                                } else {
4573                                    let builder = graph_builders.get_dfir_mut(&out_location);
4574                                    builder.add_dfir(
4575                                        parse_quote! {
4576                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4577                                        },
4578                                        None,
4579                                        Some(&next_stmt_id.to_string()),
4580                                    );
4581                                }
4582                            }
4583                            BuildersOrCallback::Callback(_, node_callback) => {
4584                                node_callback(node, next_stmt_id);
4585                            }
4586                        }
4587
4588                        let _ = next_stmt_id.get_and_increment();
4589
4590                        ident_stack.push(reduce_ident);
4591                    }
4592
4593                    HydroNode::ReduceKeyedWatermark {
4594                        f,
4595                        input,
4596                        metadata,
4597                        ..
4598                    } => {
4599                        let lifetime = if input.metadata().location_id.is_top_level() {
4600                            quote!('static)
4601                        } else {
4602                            quote!('tick)
4603                        };
4604
4605                        // watermark is processed second, so it's on top
4606                        let watermark_ident = ident_stack.pop().unwrap();
4607                        let input_ident = ident_stack.pop().unwrap();
4608                        let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4609
4610                        let chain_ident = syn::Ident::new(
4611                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4612                            Span::call_site(),
4613                        );
4614
4615                        let fold_ident =
4616                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4617
4618                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4619                            && input.metadata().collection_kind.is_bounded()
4620                        {
4621                            parse_quote!(fold_no_replay)
4622                        } else {
4623                            parse_quote!(fold)
4624                        };
4625
4626                        match builders_or_callback {
4627                            BuildersOrCallback::Builders(graph_builders) => {
4628                                if metadata.location_id.is_top_level()
4629                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4630                                    && graph_builders.singleton_intermediates()
4631                                    && !metadata.collection_kind.is_bounded()
4632                                {
4633                                    todo!(
4634                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4635                                    )
4636                                } else {
4637                                    let builder = graph_builders.get_dfir_mut(&out_location);
4638                                    builder.add_dfir(
4639                                        parse_quote! {
4640                                            #chain_ident = chain();
4641                                            #input_ident
4642                                                -> map(|x| (Some(x), None))
4643                                                -> [0]#chain_ident;
4644                                            #watermark_ident
4645                                                -> map(|watermark| (None, Some(watermark)))
4646                                                -> [1]#chain_ident;
4647
4648                                            #fold_ident = #chain_ident
4649                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4650                                                    let __reduce_keyed_fn = #f_tokens;
4651                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4652                                                        if let Some((k, v)) = opt_payload {
4653                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4654                                                                if k < curr_watermark {
4655                                                                    return;
4656                                                                }
4657                                                            }
4658                                                            match map.entry(k) {
4659                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4660                                                                    e.insert(v);
4661                                                                }
4662                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4663                                                                    __reduce_keyed_fn(e.get_mut(), v);
4664                                                                }
4665                                                            }
4666                                                        } else {
4667                                                            let watermark = opt_watermark.unwrap();
4668                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4669                                                                if watermark <= curr_watermark {
4670                                                                    return;
4671                                                                }
4672                                                            }
4673                                                            map.retain(|k, _| *k >= watermark);
4674                                                            *opt_curr_watermark = Some(watermark);
4675                                                        }
4676                                                    }
4677                                                })
4678                                                -> flat_map(|(map, _curr_watermark)| map);
4679                                        },
4680                                        None,
4681                                        Some(&next_stmt_id.to_string()),
4682                                    );
4683                                }
4684                            }
4685                            BuildersOrCallback::Callback(_, node_callback) => {
4686                                node_callback(node, next_stmt_id);
4687                            }
4688                        }
4689
4690                        let _ = next_stmt_id.get_and_increment();
4691
4692                        ident_stack.push(fold_ident);
4693                    }
4694
4695                    HydroNode::Network {
4696                        networking_info,
4697                        serialize_fn: serialize_pipeline,
4698                        instantiate_fn,
4699                        deserialize_fn: deserialize_pipeline,
4700                        input,
4701                        ..
4702                    } => {
4703                        let input_ident = ident_stack.pop().unwrap();
4704
4705                        let receiver_stream_ident =
4706                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4707
4708                        match builders_or_callback {
4709                            BuildersOrCallback::Builders(graph_builders) => {
4710                                let (sink_expr, source_expr) = match instantiate_fn {
4711                                    DebugInstantiate::Building => (
4712                                        syn::parse_quote!(DUMMY_SINK),
4713                                        syn::parse_quote!(DUMMY_SOURCE),
4714                                    ),
4715
4716                                    DebugInstantiate::Finalized(finalized) => {
4717                                        (finalized.sink.clone(), finalized.source.clone())
4718                                    }
4719                                };
4720
4721                                graph_builders.create_network(
4722                                    &input.metadata().location_id,
4723                                    &out_location,
4724                                    input_ident,
4725                                    &receiver_stream_ident,
4726                                    serialize_pipeline.as_ref(),
4727                                    sink_expr,
4728                                    source_expr,
4729                                    deserialize_pipeline.as_ref(),
4730                                    *next_stmt_id,
4731                                    networking_info,
4732                                );
4733                            }
4734                            BuildersOrCallback::Callback(_, node_callback) => {
4735                                node_callback(node, next_stmt_id);
4736                            }
4737                        }
4738
4739                        let _ = next_stmt_id.get_and_increment();
4740
4741                        ident_stack.push(receiver_stream_ident);
4742                    }
4743
4744                    HydroNode::ExternalInput {
4745                        instantiate_fn,
4746                        deserialize_fn: deserialize_pipeline,
4747                        ..
4748                    } => {
4749                        let receiver_stream_ident =
4750                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4751
4752                        match builders_or_callback {
4753                            BuildersOrCallback::Builders(graph_builders) => {
4754                                let (_, source_expr) = match instantiate_fn {
4755                                    DebugInstantiate::Building => (
4756                                        syn::parse_quote!(DUMMY_SINK),
4757                                        syn::parse_quote!(DUMMY_SOURCE),
4758                                    ),
4759
4760                                    DebugInstantiate::Finalized(finalized) => {
4761                                        (finalized.sink.clone(), finalized.source.clone())
4762                                    }
4763                                };
4764
4765                                graph_builders.create_external_source(
4766                                    &out_location,
4767                                    source_expr,
4768                                    &receiver_stream_ident,
4769                                    deserialize_pipeline.as_ref(),
4770                                    *next_stmt_id,
4771                                );
4772                            }
4773                            BuildersOrCallback::Callback(_, node_callback) => {
4774                                node_callback(node, next_stmt_id);
4775                            }
4776                        }
4777
4778                        let _ = next_stmt_id.get_and_increment();
4779
4780                        ident_stack.push(receiver_stream_ident);
4781                    }
4782
4783                    HydroNode::Counter {
4784                        tag,
4785                        duration,
4786                        prefix,
4787                        ..
4788                    } => {
4789                        let input_ident = ident_stack.pop().unwrap();
4790
4791                        let counter_ident =
4792                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4793
4794                        match builders_or_callback {
4795                            BuildersOrCallback::Builders(graph_builders) => {
4796                                let arg = format!("{}({})", prefix, tag);
4797                                let builder = graph_builders.get_dfir_mut(&out_location);
4798                                builder.add_dfir(
4799                                    parse_quote! {
4800                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4801                                    },
4802                                    None,
4803                                    Some(&next_stmt_id.to_string()),
4804                                );
4805                            }
4806                            BuildersOrCallback::Callback(_, node_callback) => {
4807                                node_callback(node, next_stmt_id);
4808                            }
4809                        }
4810
4811                        let _ = next_stmt_id.get_and_increment();
4812
4813                        ident_stack.push(counter_ident);
4814                    }
4815                }
4816            },
4817            seen_tees,
4818            false,
4819        );
4820
4821        let ret = ident_stack
4822            .pop()
4823            .expect("ident_stack should have exactly one element after traversal");
4824        assert!(
4825            ident_stack.is_empty(),
4826            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4827             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4828            ident_stack.len()
4829        );
4830        ret
4831    }
4832
4833    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4834        match self {
4835            HydroNode::Placeholder => {
4836                panic!()
4837            }
4838            HydroNode::Cast { .. }
4839            | HydroNode::ObserveNonDet { .. }
4840            | HydroNode::UnboundSingleton { .. }
4841            | HydroNode::AssertIsConsistent { .. } => {}
4842            HydroNode::Source { source, .. } => match source {
4843                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4844                HydroSource::ExternalNetwork()
4845                | HydroSource::Spin()
4846                | HydroSource::ClusterMembers(_, _)
4847                | HydroSource::Embedded(_)
4848                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4849            },
4850            HydroNode::SingletonSource { value, .. } => {
4851                transform(value);
4852            }
4853            HydroNode::CycleSource { .. }
4854            | HydroNode::Tee { .. }
4855            | HydroNode::Reference { .. }
4856            | HydroNode::YieldConcat { .. }
4857            | HydroNode::BeginAtomic { .. }
4858            | HydroNode::EndAtomic { .. }
4859            | HydroNode::Batch { .. }
4860            | HydroNode::Chain { .. }
4861            | HydroNode::MergeOrdered { .. }
4862            | HydroNode::ChainFirst { .. }
4863            | HydroNode::CrossProduct { .. }
4864            | HydroNode::CrossSingleton { .. }
4865            | HydroNode::ResolveFutures { .. }
4866            | HydroNode::ResolveFuturesBlocking { .. }
4867            | HydroNode::ResolveFuturesOrdered { .. }
4868            | HydroNode::Join { .. }
4869            | HydroNode::JoinHalf { .. }
4870            | HydroNode::Difference { .. }
4871            | HydroNode::AntiJoin { .. }
4872            | HydroNode::DeferTick { .. }
4873            | HydroNode::Enumerate { .. }
4874            | HydroNode::Unique { .. }
4875            | HydroNode::Sort { .. } => {}
4876            HydroNode::Map { f, .. }
4877            | HydroNode::FlatMap { f, .. }
4878            | HydroNode::FlatMapStreamBlocking { f, .. }
4879            | HydroNode::Filter { f, .. }
4880            | HydroNode::FilterMap { f, .. }
4881            | HydroNode::Inspect { f, .. }
4882            | HydroNode::Partition { f, .. }
4883            | HydroNode::Reduce { f, .. }
4884            | HydroNode::ReduceKeyed { f, .. }
4885            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4886                transform(&mut f.expr);
4887            }
4888            HydroNode::Fold { init, acc, .. }
4889            | HydroNode::Scan { init, acc, .. }
4890            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4891            | HydroNode::FoldKeyed { init, acc, .. } => {
4892                transform(&mut init.expr);
4893                transform(&mut acc.expr);
4894            }
4895            HydroNode::Network {
4896                serialize_fn,
4897                deserialize_fn,
4898                ..
4899            } => {
4900                if let Some(serialize_fn) = serialize_fn {
4901                    transform(serialize_fn);
4902                }
4903                if let Some(deserialize_fn) = deserialize_fn {
4904                    transform(deserialize_fn);
4905                }
4906            }
4907            HydroNode::ExternalInput { deserialize_fn, .. } => {
4908                if let Some(deserialize_fn) = deserialize_fn {
4909                    transform(deserialize_fn);
4910                }
4911            }
4912            HydroNode::Counter { duration, .. } => {
4913                transform(duration);
4914            }
4915        }
4916    }
4917
4918    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4919        &self.metadata().op
4920    }
4921
4922    pub fn metadata(&self) -> &HydroIrMetadata {
4923        match self {
4924            HydroNode::Placeholder => {
4925                panic!()
4926            }
4927            HydroNode::Cast { metadata, .. }
4928            | HydroNode::ObserveNonDet { metadata, .. }
4929            | HydroNode::AssertIsConsistent { metadata, .. }
4930            | HydroNode::UnboundSingleton { metadata, .. }
4931            | HydroNode::Source { metadata, .. }
4932            | HydroNode::SingletonSource { metadata, .. }
4933            | HydroNode::CycleSource { metadata, .. }
4934            | HydroNode::Tee { metadata, .. }
4935            | HydroNode::Reference { metadata, .. }
4936            | HydroNode::Partition { metadata, .. }
4937            | HydroNode::YieldConcat { metadata, .. }
4938            | HydroNode::BeginAtomic { metadata, .. }
4939            | HydroNode::EndAtomic { metadata, .. }
4940            | HydroNode::Batch { metadata, .. }
4941            | HydroNode::Chain { metadata, .. }
4942            | HydroNode::MergeOrdered { metadata, .. }
4943            | HydroNode::ChainFirst { metadata, .. }
4944            | HydroNode::CrossProduct { metadata, .. }
4945            | HydroNode::CrossSingleton { metadata, .. }
4946            | HydroNode::Join { metadata, .. }
4947            | HydroNode::JoinHalf { metadata, .. }
4948            | HydroNode::Difference { metadata, .. }
4949            | HydroNode::AntiJoin { metadata, .. }
4950            | HydroNode::ResolveFutures { metadata, .. }
4951            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4952            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4953            | HydroNode::Map { metadata, .. }
4954            | HydroNode::FlatMap { metadata, .. }
4955            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4956            | HydroNode::Filter { metadata, .. }
4957            | HydroNode::FilterMap { metadata, .. }
4958            | HydroNode::DeferTick { metadata, .. }
4959            | HydroNode::Enumerate { metadata, .. }
4960            | HydroNode::Inspect { metadata, .. }
4961            | HydroNode::Unique { metadata, .. }
4962            | HydroNode::Sort { metadata, .. }
4963            | HydroNode::Scan { metadata, .. }
4964            | HydroNode::ScanAsyncBlocking { metadata, .. }
4965            | HydroNode::Fold { metadata, .. }
4966            | HydroNode::FoldKeyed { metadata, .. }
4967            | HydroNode::Reduce { metadata, .. }
4968            | HydroNode::ReduceKeyed { metadata, .. }
4969            | HydroNode::ReduceKeyedWatermark { metadata, .. }
4970            | HydroNode::ExternalInput { metadata, .. }
4971            | HydroNode::Network { metadata, .. }
4972            | HydroNode::Counter { metadata, .. } => metadata,
4973        }
4974    }
4975
4976    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4977        &mut self.metadata_mut().op
4978    }
4979
4980    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4981        match self {
4982            HydroNode::Placeholder => {
4983                panic!()
4984            }
4985            HydroNode::Cast { metadata, .. }
4986            | HydroNode::ObserveNonDet { metadata, .. }
4987            | HydroNode::AssertIsConsistent { metadata, .. }
4988            | HydroNode::UnboundSingleton { metadata, .. }
4989            | HydroNode::Source { metadata, .. }
4990            | HydroNode::SingletonSource { metadata, .. }
4991            | HydroNode::CycleSource { metadata, .. }
4992            | HydroNode::Tee { metadata, .. }
4993            | HydroNode::Reference { metadata, .. }
4994            | HydroNode::Partition { metadata, .. }
4995            | HydroNode::YieldConcat { metadata, .. }
4996            | HydroNode::BeginAtomic { metadata, .. }
4997            | HydroNode::EndAtomic { metadata, .. }
4998            | HydroNode::Batch { metadata, .. }
4999            | HydroNode::Chain { metadata, .. }
5000            | HydroNode::MergeOrdered { metadata, .. }
5001            | HydroNode::ChainFirst { metadata, .. }
5002            | HydroNode::CrossProduct { metadata, .. }
5003            | HydroNode::CrossSingleton { metadata, .. }
5004            | HydroNode::Join { metadata, .. }
5005            | HydroNode::JoinHalf { metadata, .. }
5006            | HydroNode::Difference { metadata, .. }
5007            | HydroNode::AntiJoin { metadata, .. }
5008            | HydroNode::ResolveFutures { metadata, .. }
5009            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5010            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5011            | HydroNode::Map { metadata, .. }
5012            | HydroNode::FlatMap { metadata, .. }
5013            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5014            | HydroNode::Filter { metadata, .. }
5015            | HydroNode::FilterMap { metadata, .. }
5016            | HydroNode::DeferTick { metadata, .. }
5017            | HydroNode::Enumerate { metadata, .. }
5018            | HydroNode::Inspect { metadata, .. }
5019            | HydroNode::Unique { metadata, .. }
5020            | HydroNode::Sort { metadata, .. }
5021            | HydroNode::Scan { metadata, .. }
5022            | HydroNode::ScanAsyncBlocking { metadata, .. }
5023            | HydroNode::Fold { metadata, .. }
5024            | HydroNode::FoldKeyed { metadata, .. }
5025            | HydroNode::Reduce { metadata, .. }
5026            | HydroNode::ReduceKeyed { metadata, .. }
5027            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5028            | HydroNode::ExternalInput { metadata, .. }
5029            | HydroNode::Network { metadata, .. }
5030            | HydroNode::Counter { metadata, .. } => metadata,
5031        }
5032    }
5033
5034    pub fn input(&self) -> Vec<&HydroNode> {
5035        match self {
5036            HydroNode::Placeholder => {
5037                panic!()
5038            }
5039            HydroNode::Source { .. }
5040            | HydroNode::SingletonSource { .. }
5041            | HydroNode::ExternalInput { .. }
5042            | HydroNode::CycleSource { .. }
5043            | HydroNode::Tee { .. }
5044            | HydroNode::Reference { .. }
5045            | HydroNode::Partition { .. } => {
5046                // Tee/Partition should find their input in separate special ways
5047                vec![]
5048            }
5049            HydroNode::Cast { inner, .. }
5050            | HydroNode::ObserveNonDet { inner, .. }
5051            | HydroNode::YieldConcat { inner, .. }
5052            | HydroNode::BeginAtomic { inner, .. }
5053            | HydroNode::EndAtomic { inner, .. }
5054            | HydroNode::Batch { inner, .. }
5055            | HydroNode::UnboundSingleton { inner, .. }
5056            | HydroNode::AssertIsConsistent { inner, .. } => {
5057                vec![inner]
5058            }
5059            HydroNode::Chain { first, second, .. } => {
5060                vec![first, second]
5061            }
5062            HydroNode::MergeOrdered { first, second, .. } => {
5063                vec![first, second]
5064            }
5065            HydroNode::ChainFirst { first, second, .. } => {
5066                vec![first, second]
5067            }
5068            HydroNode::CrossProduct { left, right, .. }
5069            | HydroNode::CrossSingleton { left, right, .. }
5070            | HydroNode::Join { left, right, .. }
5071            | HydroNode::JoinHalf { left, right, .. } => {
5072                vec![left, right]
5073            }
5074            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5075                vec![pos, neg]
5076            }
5077            HydroNode::Map { input, .. }
5078            | HydroNode::FlatMap { input, .. }
5079            | HydroNode::FlatMapStreamBlocking { input, .. }
5080            | HydroNode::Filter { input, .. }
5081            | HydroNode::FilterMap { input, .. }
5082            | HydroNode::Sort { input, .. }
5083            | HydroNode::DeferTick { input, .. }
5084            | HydroNode::Enumerate { input, .. }
5085            | HydroNode::Inspect { input, .. }
5086            | HydroNode::Unique { input, .. }
5087            | HydroNode::Network { input, .. }
5088            | HydroNode::Counter { input, .. }
5089            | HydroNode::ResolveFutures { input, .. }
5090            | HydroNode::ResolveFuturesBlocking { input, .. }
5091            | HydroNode::ResolveFuturesOrdered { input, .. }
5092            | HydroNode::Fold { input, .. }
5093            | HydroNode::FoldKeyed { input, .. }
5094            | HydroNode::Reduce { input, .. }
5095            | HydroNode::ReduceKeyed { input, .. }
5096            | HydroNode::Scan { input, .. }
5097            | HydroNode::ScanAsyncBlocking { input, .. } => {
5098                vec![input]
5099            }
5100            HydroNode::ReduceKeyedWatermark {
5101                input, watermark, ..
5102            } => {
5103                vec![input, watermark]
5104            }
5105        }
5106    }
5107
5108    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5109        self.input()
5110            .iter()
5111            .map(|input_node| input_node.metadata())
5112            .collect()
5113    }
5114
5115    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5116    /// has other live references, meaning the upstream is already driven
5117    /// by another consumer and does not need a Null sink.
5118    pub fn is_shared_with_others(&self) -> bool {
5119        match self {
5120            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5121                Rc::strong_count(&inner.0) > 1
5122            }
5123            // A zero-output reference node is valid in DFIR (it drains itself at
5124            // end of tick), so it doesn't need to be driven by another consumer.
5125            HydroNode::Reference { .. } => false,
5126            _ => false,
5127        }
5128    }
5129
5130    pub fn print_root(&self) -> String {
5131        match self {
5132            HydroNode::Placeholder => {
5133                panic!()
5134            }
5135            HydroNode::Cast { .. } => "Cast()".to_owned(),
5136            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5137            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5138            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5139            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5140            HydroNode::SingletonSource {
5141                value,
5142                first_tick_only,
5143                ..
5144            } => format!(
5145                "SingletonSource({:?}, first_tick_only={})",
5146                value, first_tick_only
5147            ),
5148            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5149            HydroNode::Tee { inner, .. } => {
5150                format!("Tee({})", inner.0.borrow().print_root())
5151            }
5152            HydroNode::Reference { inner, kind, .. } => {
5153                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5154            }
5155            HydroNode::Partition { f, is_true, .. } => {
5156                format!("Partition({:?}, is_true={})", f, is_true)
5157            }
5158            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5159            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5160            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5161            HydroNode::Batch { .. } => "Batch()".to_owned(),
5162            HydroNode::Chain { first, second, .. } => {
5163                format!("Chain({}, {})", first.print_root(), second.print_root())
5164            }
5165            HydroNode::MergeOrdered { first, second, .. } => {
5166                format!(
5167                    "MergeOrdered({}, {})",
5168                    first.print_root(),
5169                    second.print_root()
5170                )
5171            }
5172            HydroNode::ChainFirst { first, second, .. } => {
5173                format!(
5174                    "ChainFirst({}, {})",
5175                    first.print_root(),
5176                    second.print_root()
5177                )
5178            }
5179            HydroNode::CrossProduct { left, right, .. } => {
5180                format!(
5181                    "CrossProduct({}, {})",
5182                    left.print_root(),
5183                    right.print_root()
5184                )
5185            }
5186            HydroNode::CrossSingleton { left, right, .. } => {
5187                format!(
5188                    "CrossSingleton({}, {})",
5189                    left.print_root(),
5190                    right.print_root()
5191                )
5192            }
5193            HydroNode::Join { left, right, .. } => {
5194                format!("Join({}, {})", left.print_root(), right.print_root())
5195            }
5196            HydroNode::JoinHalf { left, right, .. } => {
5197                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5198            }
5199            HydroNode::Difference { pos, neg, .. } => {
5200                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5201            }
5202            HydroNode::AntiJoin { pos, neg, .. } => {
5203                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5204            }
5205            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5206            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5207            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5208            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5209            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5210            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5211            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5212            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5213            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5214            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5215            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5216            HydroNode::Unique { .. } => "Unique()".to_owned(),
5217            HydroNode::Sort { .. } => "Sort()".to_owned(),
5218            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5219            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5220            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5221                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5222            }
5223            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5224            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5225            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5226            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5227            HydroNode::Network { .. } => "Network()".to_owned(),
5228            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5229            HydroNode::Counter { tag, duration, .. } => {
5230                format!("Counter({:?}, {:?})", tag, duration)
5231            }
5232        }
5233    }
5234}
5235
5236#[cfg(feature = "build")]
5237fn instantiate_network<'a, D>(
5238    env: &mut D::InstantiateEnv,
5239    from_location: &LocationId,
5240    to_location: &LocationId,
5241    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5242    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5243    name: Option<&str>,
5244    networking_info: &crate::networking::NetworkingInfo,
5245) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5246where
5247    D: Deploy<'a>,
5248{
5249    let ((sink, source), connect_fn) = match (from_location, to_location) {
5250        (&LocationId::Process(from), &LocationId::Process(to)) => {
5251            let from_node = processes
5252                .get(from)
5253                .unwrap_or_else(|| {
5254                    panic!("A process used in the graph was not instantiated: {}", from)
5255                })
5256                .clone();
5257            let to_node = processes
5258                .get(to)
5259                .unwrap_or_else(|| {
5260                    panic!("A process used in the graph was not instantiated: {}", to)
5261                })
5262                .clone();
5263
5264            let sink_port = from_node.next_port();
5265            let source_port = to_node.next_port();
5266
5267            (
5268                D::o2o_sink_source(
5269                    env,
5270                    &from_node,
5271                    &sink_port,
5272                    &to_node,
5273                    &source_port,
5274                    name,
5275                    networking_info,
5276                ),
5277                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5278            )
5279        }
5280        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5281            let from_node = processes
5282                .get(from)
5283                .unwrap_or_else(|| {
5284                    panic!("A process used in the graph was not instantiated: {}", from)
5285                })
5286                .clone();
5287            let to_node = clusters
5288                .get(to)
5289                .unwrap_or_else(|| {
5290                    panic!("A cluster used in the graph was not instantiated: {}", to)
5291                })
5292                .clone();
5293
5294            let sink_port = from_node.next_port();
5295            let source_port = to_node.next_port();
5296
5297            (
5298                D::o2m_sink_source(
5299                    env,
5300                    &from_node,
5301                    &sink_port,
5302                    &to_node,
5303                    &source_port,
5304                    name,
5305                    networking_info,
5306                ),
5307                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5308            )
5309        }
5310        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5311            let from_node = clusters
5312                .get(from)
5313                .unwrap_or_else(|| {
5314                    panic!("A cluster used in the graph was not instantiated: {}", from)
5315                })
5316                .clone();
5317            let to_node = processes
5318                .get(to)
5319                .unwrap_or_else(|| {
5320                    panic!("A process used in the graph was not instantiated: {}", to)
5321                })
5322                .clone();
5323
5324            let sink_port = from_node.next_port();
5325            let source_port = to_node.next_port();
5326
5327            (
5328                D::m2o_sink_source(
5329                    env,
5330                    &from_node,
5331                    &sink_port,
5332                    &to_node,
5333                    &source_port,
5334                    name,
5335                    networking_info,
5336                ),
5337                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5338            )
5339        }
5340        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5341            let from_node = clusters
5342                .get(from)
5343                .unwrap_or_else(|| {
5344                    panic!("A cluster used in the graph was not instantiated: {}", from)
5345                })
5346                .clone();
5347            let to_node = clusters
5348                .get(to)
5349                .unwrap_or_else(|| {
5350                    panic!("A cluster used in the graph was not instantiated: {}", to)
5351                })
5352                .clone();
5353
5354            let sink_port = from_node.next_port();
5355            let source_port = to_node.next_port();
5356
5357            (
5358                D::m2m_sink_source(
5359                    env,
5360                    &from_node,
5361                    &sink_port,
5362                    &to_node,
5363                    &source_port,
5364                    name,
5365                    networking_info,
5366                ),
5367                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5368            )
5369        }
5370        (LocationId::Tick(_, _), _) => panic!(),
5371        (_, LocationId::Tick(_, _)) => panic!(),
5372        (LocationId::Atomic(_), _) => panic!(),
5373        (_, LocationId::Atomic(_)) => panic!(),
5374    };
5375    (sink, source, connect_fn)
5376}
5377
5378#[cfg(test)]
5379mod serde_test;
5380
5381#[cfg(test)]
5382mod test {
5383    use std::mem::size_of;
5384
5385    use stageleft::{QuotedWithContext, q};
5386
5387    use super::*;
5388
5389    #[test]
5390    #[cfg_attr(
5391        not(feature = "build"),
5392        ignore = "expects inclusion of feature-gated fields"
5393    )]
5394    fn hydro_node_size() {
5395        assert_eq!(size_of::<HydroNode>(), 264);
5396    }
5397
5398    #[test]
5399    #[cfg_attr(
5400        not(feature = "build"),
5401        ignore = "expects inclusion of feature-gated fields"
5402    )]
5403    fn hydro_root_size() {
5404        assert_eq!(size_of::<HydroRoot>(), 136);
5405    }
5406
5407    #[test]
5408    fn test_simplify_q_macro_basic() {
5409        // Test basic non-q! expression
5410        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5411        let result = simplify_q_macro(simple_expr.clone());
5412        assert_eq!(result, simple_expr);
5413    }
5414
5415    #[test]
5416    fn test_simplify_q_macro_actual_stageleft_call() {
5417        // Test a simplified version of what a real stageleft call might look like
5418        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5419        let result = simplify_q_macro(stageleft_call);
5420        // This should be processed by our visitor and simplified to q!(...)
5421        // since we detect the stageleft::runtime_support::fn_* pattern
5422        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5423    }
5424
5425    #[test]
5426    fn test_closure_no_pipe_at_start() {
5427        // Test a closure that does not start with a pipe
5428        let stageleft_call = q!({
5429            let foo = 123;
5430            move |b: usize| b + foo
5431        })
5432        .splice_fn1_ctx(&());
5433        let result = simplify_q_macro(stageleft_call);
5434        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5435    }
5436}