Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35/// Wrapper that displays only the tokens of a parsed expr.
36///
37/// Boxes `syn::Type` which is ~240 bytes.
38#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42    fn from(expr: syn::Expr) -> Self {
43        Self(Box::new(expr))
44    }
45}
46
47impl Deref for DebugExpr {
48    type Target = syn::Expr;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl ToTokens for DebugExpr {
56    fn to_tokens(&self, tokens: &mut TokenStream) {
57        self.0.to_tokens(tokens);
58    }
59}
60
61impl Debug for DebugExpr {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.0.to_token_stream())
64    }
65}
66
67impl Display for DebugExpr {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let original = self.0.as_ref().clone();
70        let simplified = simplify_q_macro(original);
71
72        // For now, just use quote formatting without trying to parse as a statement
73        // This avoids the syn::parse_quote! issues entirely
74        write!(f, "q!({})", quote::quote!(#simplified))
75    }
76}
77
78/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
79fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80    // Try to parse the token string as a syn::Expr
81    // Use a visitor to simplify q! macro expansions
82    let mut simplifier = QMacroSimplifier::new();
83    simplifier.visit_expr_mut(&mut expr);
84
85    // If we found and simplified a q! macro, return the simplified version
86    if let Some(simplified) = simplifier.simplified_result {
87        simplified
88    } else {
89        expr
90    }
91}
92
93/// AST visitor that simplifies q! macro expansions
94#[derive(Default)]
95pub struct QMacroSimplifier {
96    pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105impl VisitMut for QMacroSimplifier {
106    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107        // Check if we already found a result to avoid further processing
108        if self.simplified_result.is_some() {
109            return;
110        }
111
112        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113            // Look for calls to stageleft::runtime_support::fn*
114            && self.is_stageleft_runtime_support_call(&path_expr.path)
115            // Try to extract the closure from the arguments
116            && let Some(closure) = self.extract_closure_from_args(&call.args)
117        {
118            self.simplified_result = Some(closure);
119            return;
120        }
121
122        // Continue visiting child expressions using the default implementation
123        // Use the default visitor to avoid infinite recursion
124        syn::visit_mut::visit_expr_mut(self, expr);
125    }
126}
127
128impl QMacroSimplifier {
129    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130        // Check if this is a call to stageleft::runtime_support::fn*
131        if let Some(last_segment) = path.segments.last() {
132            let fn_name = last_segment.ident.to_string();
133            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
134            fn_name.contains("_type_hint")
135                && path.segments.len() > 2
136                && path.segments[0].ident == "stageleft"
137                && path.segments[1].ident == "runtime_support"
138        } else {
139            false
140        }
141    }
142
143    fn extract_closure_from_args(
144        &self,
145        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146    ) -> Option<syn::Expr> {
147        // Look through the arguments for a closure expression
148        for arg in args {
149            if let syn::Expr::Closure(_) = arg {
150                return Some(arg.clone());
151            }
152            // Also check for closures nested in other expressions (like blocks)
153            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154                return Some(closure_expr);
155            }
156        }
157        None
158    }
159
160    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161        let mut visitor = ClosureFinder {
162            found_closure: None,
163            prefer_inner_blocks: true,
164        };
165        visitor.visit_expr(expr);
166        visitor.found_closure
167    }
168}
169
170/// Visitor that finds closures in expressions with special block handling
171struct ClosureFinder {
172    found_closure: Option<syn::Expr>,
173    prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178        // If we already found a closure, don't continue searching
179        if self.found_closure.is_some() {
180            return;
181        }
182
183        match expr {
184            syn::Expr::Closure(_) => {
185                self.found_closure = Some(expr.clone());
186            }
187            syn::Expr::Block(block) if self.prefer_inner_blocks => {
188                // Special handling for blocks - look for inner blocks that contain closures
189                for stmt in &block.block.stmts {
190                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
191                        && let syn::Expr::Block(_) = stmt_expr
192                    {
193                        // Check if this nested block contains a closure
194                        let mut inner_visitor = ClosureFinder {
195                            found_closure: None,
196                            prefer_inner_blocks: false, // Avoid infinite recursion
197                        };
198                        inner_visitor.visit_expr(stmt_expr);
199                        if inner_visitor.found_closure.is_some() {
200                            // Found a closure in an inner block, return that block
201                            self.found_closure = Some(stmt_expr.clone());
202                            return;
203                        }
204                    }
205                }
206
207                // If no inner block with closure found, continue with normal visitation
208                visit::visit_expr(self, expr);
209
210                // If we found a closure, just return the closure itself, not the whole block
211                // unless we're in the special case where we want the containing block
212                if self.found_closure.is_some() {
213                    // The closure was found during visitation, no need to wrap in block
214                }
215            }
216            _ => {
217                // Use default visitor behavior for all other expressions
218                visit::visit_expr(self, expr);
219            }
220        }
221    }
222}
223
224/// Debug displays the type's tokens.
225///
226/// Boxes `syn::Type` which is ~320 bytes.
227#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231    fn from(t: syn::Type) -> Self {
232        Self(Box::new(t))
233    }
234}
235
236impl Deref for DebugType {
237    type Target = syn::Type;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244impl ToTokens for DebugType {
245    fn to_tokens(&self, tokens: &mut TokenStream) {
246        self.0.to_tokens(tokens);
247    }
248}
249
250impl Debug for DebugType {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.to_token_stream())
253    }
254}
255
256pub enum DebugInstantiate {
257    Building,
258    Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262    not(feature = "build"),
263    expect(
264        dead_code,
265        reason = "sink, source unused without `feature = \"build\"`."
266    )
267)]
268pub struct DebugInstantiateFinalized {
269    sink: syn::Expr,
270    source: syn::Expr,
271    connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275    fn from(f: DebugInstantiateFinalized) -> Self {
276        Self::Finalized(Box::new(f))
277    }
278}
279
280impl Debug for DebugInstantiate {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "<network instantiate>")
283    }
284}
285
286impl Hash for DebugInstantiate {
287    fn hash<H: Hasher>(&self, _state: &mut H) {
288        // Do nothing
289    }
290}
291
292impl Clone for DebugInstantiate {
293    fn clone(&self) -> Self {
294        match self {
295            DebugInstantiate::Building => DebugInstantiate::Building,
296            DebugInstantiate::Finalized(_) => {
297                panic!("DebugInstantiate::Finalized should not be cloned")
298            }
299        }
300    }
301}
302
303/// Tracks the instantiation state of a `ClusterMembers` source.
304///
305/// During `compile_network`, the first `ClusterMembers` node for a given
306/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
307/// receives the expression returned by `Deploy::cluster_membership_stream`.
308/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
309/// during code-gen they simply reference the tee output of the first node
310/// instead of creating a redundant `source_stream`.
311#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313    /// Not yet instantiated.
314    Uninit,
315    /// The primary instance: holds the stream expression and will emit
316    /// `source_stream(expr) -> tee()` during code-gen.
317    Stream(DebugExpr),
318    /// A secondary instance that references the tee output of the primary.
319    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
320    /// can derive the deterministic tee ident without extra state.
321    Tee(LocationId, LocationId),
322}
323
324/// A source in a Hydro graph, where data enters the graph.
325#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327    Stream(DebugExpr),
328    ExternalNetwork(),
329    Iter(DebugExpr),
330    Spin(),
331    ClusterMembers(LocationId, ClusterMembersState),
332    Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
337/// and simulations.
338///
339/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
340/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
341pub trait DfirBuilder {
342    /// Whether the representation of singletons should include intermediate states.
343    fn singleton_intermediates(&self) -> bool;
344
345    /// Gets the DFIR builder for the given location, creating it if necessary.
346    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348    fn batch(
349        &mut self,
350        in_ident: syn::Ident,
351        in_location: &LocationId,
352        in_kind: &CollectionKind,
353        out_ident: &syn::Ident,
354        out_location: &LocationId,
355        op_meta: &HydroIrOpMetadata,
356    );
357    fn yield_from_tick(
358        &mut self,
359        in_ident: syn::Ident,
360        in_location: &LocationId,
361        in_kind: &CollectionKind,
362        out_ident: &syn::Ident,
363        out_location: &LocationId,
364    );
365
366    fn begin_atomic(
367        &mut self,
368        in_ident: syn::Ident,
369        in_location: &LocationId,
370        in_kind: &CollectionKind,
371        out_ident: &syn::Ident,
372        out_location: &LocationId,
373        op_meta: &HydroIrOpMetadata,
374    );
375    fn end_atomic(
376        &mut self,
377        in_ident: syn::Ident,
378        in_location: &LocationId,
379        in_kind: &CollectionKind,
380        out_ident: &syn::Ident,
381    );
382
383    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384    fn observe_nondet(
385        &mut self,
386        trusted: bool,
387        location: &LocationId,
388        in_ident: syn::Ident,
389        in_kind: &CollectionKind,
390        out_ident: &syn::Ident,
391        out_kind: &CollectionKind,
392        op_meta: &HydroIrOpMetadata,
393    );
394
395    #[expect(clippy::too_many_arguments, reason = "TODO")]
396    fn create_network(
397        &mut self,
398        from: &LocationId,
399        to: &LocationId,
400        input_ident: syn::Ident,
401        out_ident: &syn::Ident,
402        serialize: Option<&DebugExpr>,
403        sink: syn::Expr,
404        source: syn::Expr,
405        deserialize: Option<&DebugExpr>,
406        tag_id: usize,
407        networking_info: &crate::networking::NetworkingInfo,
408    );
409
410    fn create_external_source(
411        &mut self,
412        on: &LocationId,
413        source_expr: syn::Expr,
414        out_ident: &syn::Ident,
415        deserialize: Option<&DebugExpr>,
416        tag_id: usize,
417    );
418
419    fn create_external_output(
420        &mut self,
421        on: &LocationId,
422        sink_expr: syn::Expr,
423        input_ident: &syn::Ident,
424        serialize: Option<&DebugExpr>,
425        tag_id: usize,
426    );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431    fn singleton_intermediates(&self) -> bool {
432        false
433    }
434
435    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436        self.entry(location.root().key())
437            .expect("location was removed")
438            .or_default()
439    }
440
441    fn batch(
442        &mut self,
443        in_ident: syn::Ident,
444        in_location: &LocationId,
445        in_kind: &CollectionKind,
446        out_ident: &syn::Ident,
447        _out_location: &LocationId,
448        _op_meta: &HydroIrOpMetadata,
449    ) {
450        let builder = self.get_dfir_mut(in_location.root());
451        if in_kind.is_bounded()
452            && matches!(
453                in_kind,
454                CollectionKind::Singleton { .. }
455                    | CollectionKind::Optional { .. }
456                    | CollectionKind::KeyedSingleton { .. }
457            )
458        {
459            assert!(in_location.is_top_level());
460            builder.add_dfir(
461                parse_quote! {
462                    #out_ident = #in_ident -> persist::<'static>();
463                },
464                None,
465                None,
466            );
467        } else {
468            builder.add_dfir(
469                parse_quote! {
470                    #out_ident = #in_ident;
471                },
472                None,
473                None,
474            );
475        }
476    }
477
478    fn yield_from_tick(
479        &mut self,
480        in_ident: syn::Ident,
481        in_location: &LocationId,
482        _in_kind: &CollectionKind,
483        out_ident: &syn::Ident,
484        _out_location: &LocationId,
485    ) {
486        let builder = self.get_dfir_mut(in_location.root());
487        builder.add_dfir(
488            parse_quote! {
489                #out_ident = #in_ident;
490            },
491            None,
492            None,
493        );
494    }
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        _in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        _out_location: &LocationId,
503        _op_meta: &HydroIrOpMetadata,
504    ) {
505        let builder = self.get_dfir_mut(in_location.root());
506        builder.add_dfir(
507            parse_quote! {
508                #out_ident = #in_ident;
509            },
510            None,
511            None,
512        );
513    }
514
515    fn end_atomic(
516        &mut self,
517        in_ident: syn::Ident,
518        in_location: &LocationId,
519        _in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521    ) {
522        let builder = self.get_dfir_mut(in_location.root());
523        builder.add_dfir(
524            parse_quote! {
525                #out_ident = #in_ident;
526            },
527            None,
528            None,
529        );
530    }
531
532    fn observe_nondet(
533        &mut self,
534        _trusted: bool,
535        location: &LocationId,
536        in_ident: syn::Ident,
537        _in_kind: &CollectionKind,
538        out_ident: &syn::Ident,
539        _out_kind: &CollectionKind,
540        _op_meta: &HydroIrOpMetadata,
541    ) {
542        let builder = self.get_dfir_mut(location);
543        builder.add_dfir(
544            parse_quote! {
545                #out_ident = #in_ident;
546            },
547            None,
548            None,
549        );
550    }
551
552    fn create_network(
553        &mut self,
554        from: &LocationId,
555        to: &LocationId,
556        input_ident: syn::Ident,
557        out_ident: &syn::Ident,
558        serialize: Option<&DebugExpr>,
559        sink: syn::Expr,
560        source: syn::Expr,
561        deserialize: Option<&DebugExpr>,
562        tag_id: usize,
563        _networking_info: &crate::networking::NetworkingInfo,
564    ) {
565        let sender_builder = self.get_dfir_mut(from);
566        if let Some(serialize_pipeline) = serialize {
567            sender_builder.add_dfir(
568                parse_quote! {
569                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570                },
571                None,
572                // operator tag separates send and receive, which otherwise have the same next_stmt_id
573                Some(&format!("send{}", tag_id)),
574            );
575        } else {
576            sender_builder.add_dfir(
577                parse_quote! {
578                    #input_ident -> dest_sink(#sink);
579                },
580                None,
581                Some(&format!("send{}", tag_id)),
582            );
583        }
584
585        let receiver_builder = self.get_dfir_mut(to);
586        if let Some(deserialize_pipeline) = deserialize {
587            receiver_builder.add_dfir(
588                parse_quote! {
589                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590                },
591                None,
592                Some(&format!("recv{}", tag_id)),
593            );
594        } else {
595            receiver_builder.add_dfir(
596                parse_quote! {
597                    #out_ident = source_stream(#source);
598                },
599                None,
600                Some(&format!("recv{}", tag_id)),
601            );
602        }
603    }
604
605    fn create_external_source(
606        &mut self,
607        on: &LocationId,
608        source_expr: syn::Expr,
609        out_ident: &syn::Ident,
610        deserialize: Option<&DebugExpr>,
611        tag_id: usize,
612    ) {
613        let receiver_builder = self.get_dfir_mut(on);
614        if let Some(deserialize_pipeline) = deserialize {
615            receiver_builder.add_dfir(
616                parse_quote! {
617                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618                },
619                None,
620                Some(&format!("recv{}", tag_id)),
621            );
622        } else {
623            receiver_builder.add_dfir(
624                parse_quote! {
625                    #out_ident = source_stream(#source_expr);
626                },
627                None,
628                Some(&format!("recv{}", tag_id)),
629            );
630        }
631    }
632
633    fn create_external_output(
634        &mut self,
635        on: &LocationId,
636        sink_expr: syn::Expr,
637        input_ident: &syn::Ident,
638        serialize: Option<&DebugExpr>,
639        tag_id: usize,
640    ) {
641        let sender_builder = self.get_dfir_mut(on);
642        if let Some(serialize_fn) = serialize {
643            sender_builder.add_dfir(
644                parse_quote! {
645                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646                },
647                None,
648                // operator tag separates send and receive, which otherwise have the same next_stmt_id
649                Some(&format!("send{}", tag_id)),
650            );
651        } else {
652            sender_builder.add_dfir(
653                parse_quote! {
654                    #input_ident -> dest_sink(#sink_expr);
655                },
656                None,
657                Some(&format!("send{}", tag_id)),
658            );
659        }
660    }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666    L: FnMut(&mut HydroRoot, &mut usize),
667    N: FnMut(&mut HydroNode, &mut usize),
668{
669    Builders(&'a mut dyn DfirBuilder),
670    Callback(L, N),
671}
672
673/// An root in a Hydro graph, which is an pipeline that doesn't emit
674/// any downstream values. Traversals over the dataflow graph and
675/// generating DFIR IR start from roots.
676#[derive(Debug, Hash)]
677pub enum HydroRoot {
678    ForEach {
679        f: DebugExpr,
680        input: Box<HydroNode>,
681        op_metadata: HydroIrOpMetadata,
682    },
683    SendExternal {
684        to_external_key: LocationKey,
685        to_port_id: ExternalPortId,
686        to_many: bool,
687        unpaired: bool,
688        serialize_fn: Option<DebugExpr>,
689        instantiate_fn: DebugInstantiate,
690        input: Box<HydroNode>,
691        op_metadata: HydroIrOpMetadata,
692    },
693    DestSink {
694        sink: DebugExpr,
695        input: Box<HydroNode>,
696        op_metadata: HydroIrOpMetadata,
697    },
698    CycleSink {
699        cycle_id: CycleId,
700        input: Box<HydroNode>,
701        op_metadata: HydroIrOpMetadata,
702    },
703    EmbeddedOutput {
704        ident: syn::Ident,
705        input: Box<HydroNode>,
706        op_metadata: HydroIrOpMetadata,
707    },
708}
709
710impl HydroRoot {
711    #[cfg(feature = "build")]
712    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713    pub fn compile_network<'a, D>(
714        &mut self,
715        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716        seen_tees: &mut SeenTees,
717        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718        processes: &SparseSecondaryMap<LocationKey, D::Process>,
719        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720        externals: &SparseSecondaryMap<LocationKey, D::External>,
721        env: &mut D::InstantiateEnv,
722    ) where
723        D: Deploy<'a>,
724    {
725        let refcell_extra_stmts = RefCell::new(extra_stmts);
726        let refcell_env = RefCell::new(env);
727        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728        self.transform_bottom_up(
729            &mut |l| {
730                if let HydroRoot::SendExternal {
731                    input,
732                    to_external_key,
733                    to_port_id,
734                    to_many,
735                    unpaired,
736                    instantiate_fn,
737                    ..
738                } = l
739                {
740                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741                        DebugInstantiate::Building => {
742                            let to_node = externals
743                                .get(*to_external_key)
744                                .unwrap_or_else(|| {
745                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
746                                })
747                                .clone();
748
749                            match input.metadata().location_id.root() {
750                                &LocationId::Process(process_key) => {
751                                    if *to_many {
752                                        (
753                                            (
754                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755                                                parse_quote!(DUMMY),
756                                            ),
757                                            Box::new(|| {}) as Box<dyn FnOnce()>,
758                                        )
759                                    } else {
760                                        let from_node = processes
761                                            .get(process_key)
762                                            .unwrap_or_else(|| {
763                                                panic!("A process used in the graph was not instantiated: {}", process_key)
764                                            })
765                                            .clone();
766
767                                        let sink_port = from_node.next_port();
768                                        let source_port = to_node.next_port();
769
770                                        if *unpaired {
771                                            use stageleft::quote_type;
772                                            use tokio_util::codec::LengthDelimitedCodec;
773
774                                            to_node.register(*to_port_id, source_port.clone());
775
776                                            let _ = D::e2o_source(
777                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778                                                &to_node, &source_port,
779                                                &from_node, &sink_port,
780                                                &quote_type::<LengthDelimitedCodec>(),
781                                                format!("{}_{}", *to_external_key, *to_port_id)
782                                            );
783                                        }
784
785                                        (
786                                            (
787                                                D::o2e_sink(
788                                                    &from_node,
789                                                    &sink_port,
790                                                    &to_node,
791                                                    &source_port,
792                                                    format!("{}_{}", *to_external_key, *to_port_id)
793                                                ),
794                                                parse_quote!(DUMMY),
795                                            ),
796                                            if *unpaired {
797                                                D::e2o_connect(
798                                                    &to_node,
799                                                    &source_port,
800                                                    &from_node,
801                                                    &sink_port,
802                                                    *to_many,
803                                                    NetworkHint::Auto,
804                                                )
805                                            } else {
806                                                Box::new(|| {}) as Box<dyn FnOnce()>
807                                            },
808                                        )
809                                    }
810                                }
811                                LocationId::Cluster(_) => todo!(),
812                                _ => panic!()
813                            }
814                        },
815
816                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817                    };
818
819                    *instantiate_fn = DebugInstantiateFinalized {
820                        sink: sink_expr,
821                        source: source_expr,
822                        connect_fn: Some(connect_fn),
823                    }
824                    .into();
825                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826                    let element_type = match &input.metadata().collection_kind {
827                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828                        _ => panic!("Embedded output must have Stream collection kind"),
829                    };
830                    let location_key = match input.metadata().location_id.root() {
831                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
832                        _ => panic!("Embedded output must be on a process or cluster"),
833                    };
834                    D::register_embedded_output(
835                        &mut refcell_env.borrow_mut(),
836                        location_key,
837                        ident,
838                        &element_type,
839                    );
840                }
841            },
842            &mut |n| {
843                if let HydroNode::Network {
844                    name,
845                    networking_info,
846                    input,
847                    instantiate_fn,
848                    metadata,
849                    ..
850                } = n
851                {
852                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853                        DebugInstantiate::Building => instantiate_network::<D>(
854                            &mut refcell_env.borrow_mut(),
855                            input.metadata().location_id.root(),
856                            metadata.location_id.root(),
857                            processes,
858                            clusters,
859                            name.as_deref(),
860                            networking_info,
861                        ),
862
863                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864                    };
865
866                    *instantiate_fn = DebugInstantiateFinalized {
867                        sink: sink_expr,
868                        source: source_expr,
869                        connect_fn: Some(connect_fn),
870                    }
871                    .into();
872                } else if let HydroNode::ExternalInput {
873                    from_external_key,
874                    from_port_id,
875                    from_many,
876                    codec_type,
877                    port_hint,
878                    instantiate_fn,
879                    metadata,
880                    ..
881                } = n
882                {
883                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884                        DebugInstantiate::Building => {
885                            let from_node = externals
886                                .get(*from_external_key)
887                                .unwrap_or_else(|| {
888                                    panic!(
889                                        "A external used in the graph was not instantiated: {}",
890                                        from_external_key,
891                                    )
892                                })
893                                .clone();
894
895                            match metadata.location_id.root() {
896                                &LocationId::Process(process_key) => {
897                                    let to_node = processes
898                                        .get(process_key)
899                                        .unwrap_or_else(|| {
900                                            panic!("A process used in the graph was not instantiated: {}", process_key)
901                                        })
902                                        .clone();
903
904                                    let sink_port = from_node.next_port();
905                                    let source_port = to_node.next_port();
906
907                                    from_node.register(*from_port_id, sink_port.clone());
908
909                                    (
910                                        (
911                                            parse_quote!(DUMMY),
912                                            if *from_many {
913                                                D::e2o_many_source(
914                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915                                                    &to_node, &source_port,
916                                                    codec_type.0.as_ref(),
917                                                    format!("{}_{}", *from_external_key, *from_port_id)
918                                                )
919                                            } else {
920                                                D::e2o_source(
921                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922                                                    &from_node, &sink_port,
923                                                    &to_node, &source_port,
924                                                    codec_type.0.as_ref(),
925                                                    format!("{}_{}", *from_external_key, *from_port_id)
926                                                )
927                                            },
928                                        ),
929                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930                                    )
931                                }
932                                LocationId::Cluster(_) => todo!(),
933                                _ => panic!()
934                            }
935                        },
936
937                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938                    };
939
940                    *instantiate_fn = DebugInstantiateFinalized {
941                        sink: sink_expr,
942                        source: source_expr,
943                        connect_fn: Some(connect_fn),
944                    }
945                    .into();
946                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947                    let element_type = match &metadata.collection_kind {
948                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949                        _ => panic!("Embedded source must have Stream collection kind"),
950                    };
951                    let location_key = match metadata.location_id.root() {
952                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
953                        _ => panic!("Embedded source must be on a process or cluster"),
954                    };
955                    D::register_embedded_input(
956                        &mut refcell_env.borrow_mut(),
957                        location_key,
958                        ident,
959                        &element_type,
960                    );
961                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962                    match state {
963                        ClusterMembersState::Uninit => {
964                            let at_location = metadata.location_id.root().clone();
965                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
967                                // First occurrence: call cluster_membership_stream and mark as Stream.
968                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970                                    &(),
971                                );
972                                *state = ClusterMembersState::Stream(expr.into());
973                            } else {
974                                // Already instantiated for this (at, target) pair: just tee.
975                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
976                            }
977                        }
978                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979                            panic!("cluster members already finalized");
980                        }
981                    }
982                }
983            },
984            seen_tees,
985            false,
986        );
987    }
988
989    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
990        self.transform_bottom_up(
991            &mut |l| {
992                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993                    match instantiate_fn {
994                        DebugInstantiate::Building => panic!("network not built"),
995
996                        DebugInstantiate::Finalized(finalized) => {
997                            (finalized.connect_fn.take().unwrap())();
998                        }
999                    }
1000                }
1001            },
1002            &mut |n| {
1003                if let HydroNode::Network { instantiate_fn, .. }
1004                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005                {
1006                    match instantiate_fn {
1007                        DebugInstantiate::Building => panic!("network not built"),
1008
1009                        DebugInstantiate::Finalized(finalized) => {
1010                            (finalized.connect_fn.take().unwrap())();
1011                        }
1012                    }
1013                }
1014            },
1015            seen_tees,
1016            false,
1017        );
1018    }
1019
1020    pub fn transform_bottom_up(
1021        &mut self,
1022        transform_root: &mut impl FnMut(&mut HydroRoot),
1023        transform_node: &mut impl FnMut(&mut HydroNode),
1024        seen_tees: &mut SeenTees,
1025        check_well_formed: bool,
1026    ) {
1027        self.transform_children(
1028            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029            seen_tees,
1030        );
1031
1032        transform_root(self);
1033    }
1034
1035    pub fn transform_children(
1036        &mut self,
1037        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1038        seen_tees: &mut SeenTees,
1039    ) {
1040        match self {
1041            HydroRoot::ForEach { input, .. }
1042            | HydroRoot::SendExternal { input, .. }
1043            | HydroRoot::DestSink { input, .. }
1044            | HydroRoot::CycleSink { input, .. }
1045            | HydroRoot::EmbeddedOutput { input, .. } => {
1046                transform(input, seen_tees);
1047            }
1048        }
1049    }
1050
1051    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1052        match self {
1053            HydroRoot::ForEach {
1054                f,
1055                input,
1056                op_metadata,
1057            } => HydroRoot::ForEach {
1058                f: f.clone(),
1059                input: Box::new(input.deep_clone(seen_tees)),
1060                op_metadata: op_metadata.clone(),
1061            },
1062            HydroRoot::SendExternal {
1063                to_external_key,
1064                to_port_id,
1065                to_many,
1066                unpaired,
1067                serialize_fn,
1068                instantiate_fn,
1069                input,
1070                op_metadata,
1071            } => HydroRoot::SendExternal {
1072                to_external_key: *to_external_key,
1073                to_port_id: *to_port_id,
1074                to_many: *to_many,
1075                unpaired: *unpaired,
1076                serialize_fn: serialize_fn.clone(),
1077                instantiate_fn: instantiate_fn.clone(),
1078                input: Box::new(input.deep_clone(seen_tees)),
1079                op_metadata: op_metadata.clone(),
1080            },
1081            HydroRoot::DestSink {
1082                sink,
1083                input,
1084                op_metadata,
1085            } => HydroRoot::DestSink {
1086                sink: sink.clone(),
1087                input: Box::new(input.deep_clone(seen_tees)),
1088                op_metadata: op_metadata.clone(),
1089            },
1090            HydroRoot::CycleSink {
1091                cycle_id,
1092                input,
1093                op_metadata,
1094            } => HydroRoot::CycleSink {
1095                cycle_id: *cycle_id,
1096                input: Box::new(input.deep_clone(seen_tees)),
1097                op_metadata: op_metadata.clone(),
1098            },
1099            HydroRoot::EmbeddedOutput {
1100                ident,
1101                input,
1102                op_metadata,
1103            } => HydroRoot::EmbeddedOutput {
1104                ident: ident.clone(),
1105                input: Box::new(input.deep_clone(seen_tees)),
1106                op_metadata: op_metadata.clone(),
1107            },
1108        }
1109    }
1110
1111    #[cfg(feature = "build")]
1112    pub fn emit(
1113        &mut self,
1114        graph_builders: &mut dyn DfirBuilder,
1115        seen_tees: &mut SeenTees,
1116        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1117        next_stmt_id: &mut usize,
1118    ) {
1119        self.emit_core(
1120            &mut BuildersOrCallback::<
1121                fn(&mut HydroRoot, &mut usize),
1122                fn(&mut HydroNode, &mut usize),
1123            >::Builders(graph_builders),
1124            seen_tees,
1125            built_tees,
1126            next_stmt_id,
1127        );
1128    }
1129
1130    #[cfg(feature = "build")]
1131    pub fn emit_core(
1132        &mut self,
1133        builders_or_callback: &mut BuildersOrCallback<
1134            impl FnMut(&mut HydroRoot, &mut usize),
1135            impl FnMut(&mut HydroNode, &mut usize),
1136        >,
1137        seen_tees: &mut SeenTees,
1138        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1139        next_stmt_id: &mut usize,
1140    ) {
1141        match self {
1142            HydroRoot::ForEach { f, input, .. } => {
1143                let input_ident =
1144                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146                match builders_or_callback {
1147                    BuildersOrCallback::Builders(graph_builders) => {
1148                        graph_builders
1149                            .get_dfir_mut(&input.metadata().location_id)
1150                            .add_dfir(
1151                                parse_quote! {
1152                                    #input_ident -> for_each(#f);
1153                                },
1154                                None,
1155                                Some(&next_stmt_id.to_string()),
1156                            );
1157                    }
1158                    BuildersOrCallback::Callback(leaf_callback, _) => {
1159                        leaf_callback(self, next_stmt_id);
1160                    }
1161                }
1162
1163                *next_stmt_id += 1;
1164            }
1165
1166            HydroRoot::SendExternal {
1167                serialize_fn,
1168                instantiate_fn,
1169                input,
1170                ..
1171            } => {
1172                let input_ident =
1173                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175                match builders_or_callback {
1176                    BuildersOrCallback::Builders(graph_builders) => {
1177                        let (sink_expr, _) = match instantiate_fn {
1178                            DebugInstantiate::Building => (
1179                                syn::parse_quote!(DUMMY_SINK),
1180                                syn::parse_quote!(DUMMY_SOURCE),
1181                            ),
1182
1183                            DebugInstantiate::Finalized(finalized) => {
1184                                (finalized.sink.clone(), finalized.source.clone())
1185                            }
1186                        };
1187
1188                        graph_builders.create_external_output(
1189                            &input.metadata().location_id,
1190                            sink_expr,
1191                            &input_ident,
1192                            serialize_fn.as_ref(),
1193                            *next_stmt_id,
1194                        );
1195                    }
1196                    BuildersOrCallback::Callback(leaf_callback, _) => {
1197                        leaf_callback(self, next_stmt_id);
1198                    }
1199                }
1200
1201                *next_stmt_id += 1;
1202            }
1203
1204            HydroRoot::DestSink { sink, input, .. } => {
1205                let input_ident =
1206                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208                match builders_or_callback {
1209                    BuildersOrCallback::Builders(graph_builders) => {
1210                        graph_builders
1211                            .get_dfir_mut(&input.metadata().location_id)
1212                            .add_dfir(
1213                                parse_quote! {
1214                                    #input_ident -> dest_sink(#sink);
1215                                },
1216                                None,
1217                                Some(&next_stmt_id.to_string()),
1218                            );
1219                    }
1220                    BuildersOrCallback::Callback(leaf_callback, _) => {
1221                        leaf_callback(self, next_stmt_id);
1222                    }
1223                }
1224
1225                *next_stmt_id += 1;
1226            }
1227
1228            HydroRoot::CycleSink {
1229                cycle_id, input, ..
1230            } => {
1231                let input_ident =
1232                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234                match builders_or_callback {
1235                    BuildersOrCallback::Builders(graph_builders) => {
1236                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1237                            CollectionKind::KeyedSingleton {
1238                                key_type,
1239                                value_type,
1240                                ..
1241                            }
1242                            | CollectionKind::KeyedStream {
1243                                key_type,
1244                                value_type,
1245                                ..
1246                            } => {
1247                                parse_quote!((#key_type, #value_type))
1248                            }
1249                            CollectionKind::Stream { element_type, .. }
1250                            | CollectionKind::Singleton { element_type, .. }
1251                            | CollectionKind::Optional { element_type, .. } => {
1252                                parse_quote!(#element_type)
1253                            }
1254                        };
1255
1256                        let cycle_id_ident = cycle_id.as_ident();
1257                        graph_builders
1258                            .get_dfir_mut(&input.metadata().location_id)
1259                            .add_dfir(
1260                                parse_quote! {
1261                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262                                },
1263                                None,
1264                                None,
1265                            );
1266                    }
1267                    // No ID, no callback
1268                    BuildersOrCallback::Callback(_, _) => {}
1269                }
1270            }
1271
1272            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273                let input_ident =
1274                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276                match builders_or_callback {
1277                    BuildersOrCallback::Builders(graph_builders) => {
1278                        graph_builders
1279                            .get_dfir_mut(&input.metadata().location_id)
1280                            .add_dfir(
1281                                parse_quote! {
1282                                    #input_ident -> for_each(&mut #ident);
1283                                },
1284                                None,
1285                                Some(&next_stmt_id.to_string()),
1286                            );
1287                    }
1288                    BuildersOrCallback::Callback(leaf_callback, _) => {
1289                        leaf_callback(self, next_stmt_id);
1290                    }
1291                }
1292
1293                *next_stmt_id += 1;
1294            }
1295        }
1296    }
1297
1298    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299        match self {
1300            HydroRoot::ForEach { op_metadata, .. }
1301            | HydroRoot::SendExternal { op_metadata, .. }
1302            | HydroRoot::DestSink { op_metadata, .. }
1303            | HydroRoot::CycleSink { op_metadata, .. }
1304            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305        }
1306    }
1307
1308    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309        match self {
1310            HydroRoot::ForEach { op_metadata, .. }
1311            | HydroRoot::SendExternal { op_metadata, .. }
1312            | HydroRoot::DestSink { op_metadata, .. }
1313            | HydroRoot::CycleSink { op_metadata, .. }
1314            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315        }
1316    }
1317
1318    pub fn input(&self) -> &HydroNode {
1319        match self {
1320            HydroRoot::ForEach { input, .. }
1321            | HydroRoot::SendExternal { input, .. }
1322            | HydroRoot::DestSink { input, .. }
1323            | HydroRoot::CycleSink { input, .. }
1324            | HydroRoot::EmbeddedOutput { input, .. } => input,
1325        }
1326    }
1327
1328    pub fn input_metadata(&self) -> &HydroIrMetadata {
1329        self.input().metadata()
1330    }
1331
1332    pub fn print_root(&self) -> String {
1333        match self {
1334            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338            HydroRoot::EmbeddedOutput { ident, .. } => {
1339                format!("EmbeddedOutput({})", ident)
1340            }
1341        }
1342    }
1343
1344    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345        match self {
1346            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347                transform(f);
1348            }
1349            HydroRoot::SendExternal { .. }
1350            | HydroRoot::CycleSink { .. }
1351            | HydroRoot::EmbeddedOutput { .. } => {}
1352        }
1353    }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358    let mut builders = SecondaryMap::new();
1359    let mut seen_tees = HashMap::new();
1360    let mut built_tees = HashMap::new();
1361    let mut next_stmt_id = 0;
1362    for leaf in ir {
1363        leaf.emit(
1364            &mut builders,
1365            &mut seen_tees,
1366            &mut built_tees,
1367            &mut next_stmt_id,
1368        );
1369    }
1370    builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375    ir: &mut [HydroRoot],
1376    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379    let mut seen_tees = HashMap::new();
1380    let mut built_tees = HashMap::new();
1381    let mut next_stmt_id = 0;
1382    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383    ir.iter_mut().for_each(|leaf| {
1384        leaf.emit_core(
1385            &mut callback,
1386            &mut seen_tees,
1387            &mut built_tees,
1388            &mut next_stmt_id,
1389        );
1390    });
1391}
1392
1393pub fn transform_bottom_up(
1394    ir: &mut [HydroRoot],
1395    transform_root: &mut impl FnMut(&mut HydroRoot),
1396    transform_node: &mut impl FnMut(&mut HydroNode),
1397    check_well_formed: bool,
1398) {
1399    let mut seen_tees = HashMap::new();
1400    ir.iter_mut().for_each(|leaf| {
1401        leaf.transform_bottom_up(
1402            transform_root,
1403            transform_node,
1404            &mut seen_tees,
1405            check_well_formed,
1406        );
1407    });
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411    let mut seen_tees = HashMap::new();
1412    ir.iter()
1413        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414        .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423    PRINTED_TEES.with(|printed_tees| {
1424        let mut printed_tees_mut = printed_tees.borrow_mut();
1425        *printed_tees_mut = Some((0, HashMap::new()));
1426        drop(printed_tees_mut);
1427
1428        let ret = f();
1429
1430        let mut printed_tees_mut = printed_tees.borrow_mut();
1431        *printed_tees_mut = None;
1432
1433        ret
1434    })
1435}
1436
1437pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl TeeNode {
1440    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441        Rc::as_ptr(&self.0)
1442    }
1443}
1444
1445impl Debug for TeeNode {
1446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447        PRINTED_TEES.with(|printed_tees| {
1448            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451            if let Some(printed_tees_mut) = printed_tees_mut {
1452                if let Some(existing) = printed_tees_mut
1453                    .1
1454                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455                {
1456                    write!(f, "<tee {}>", existing)
1457                } else {
1458                    let next_id = printed_tees_mut.0;
1459                    printed_tees_mut.0 += 1;
1460                    printed_tees_mut
1461                        .1
1462                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463                    drop(printed_tees_mut_borrow);
1464                    write!(f, "<tee {}>: ", next_id)?;
1465                    Debug::fmt(&self.0.borrow(), f)
1466                }
1467            } else {
1468                drop(printed_tees_mut_borrow);
1469                write!(f, "<tee>: ")?;
1470                Debug::fmt(&self.0.borrow(), f)
1471            }
1472        })
1473    }
1474}
1475
1476impl Hash for TeeNode {
1477    fn hash<H: Hasher>(&self, state: &mut H) {
1478        self.0.borrow_mut().hash(state);
1479    }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484    Unbounded,
1485    Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490    NoOrder,
1491    TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496    AtLeastOnce,
1497    ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502    Unbounded,
1503    BoundedValue,
1504    Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509    Stream {
1510        bound: BoundKind,
1511        order: StreamOrder,
1512        retry: StreamRetry,
1513        element_type: DebugType,
1514    },
1515    Singleton {
1516        bound: BoundKind,
1517        element_type: DebugType,
1518    },
1519    Optional {
1520        bound: BoundKind,
1521        element_type: DebugType,
1522    },
1523    KeyedStream {
1524        bound: BoundKind,
1525        value_order: StreamOrder,
1526        value_retry: StreamRetry,
1527        key_type: DebugType,
1528        value_type: DebugType,
1529    },
1530    KeyedSingleton {
1531        bound: KeyedSingletonBoundKind,
1532        key_type: DebugType,
1533        value_type: DebugType,
1534    },
1535}
1536
1537impl CollectionKind {
1538    pub fn is_bounded(&self) -> bool {
1539        matches!(
1540            self,
1541            CollectionKind::Stream {
1542                bound: BoundKind::Bounded,
1543                ..
1544            } | CollectionKind::Singleton {
1545                bound: BoundKind::Bounded,
1546                ..
1547            } | CollectionKind::Optional {
1548                bound: BoundKind::Bounded,
1549                ..
1550            } | CollectionKind::KeyedStream {
1551                bound: BoundKind::Bounded,
1552                ..
1553            } | CollectionKind::KeyedSingleton {
1554                bound: KeyedSingletonBoundKind::Bounded,
1555                ..
1556            }
1557        )
1558    }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563    pub location_id: LocationId,
1564    pub collection_kind: CollectionKind,
1565    pub cardinality: Option<usize>,
1566    pub tag: Option<String>,
1567    pub op: HydroIrOpMetadata,
1568}
1569
1570// HydroIrMetadata shouldn't be used to hash or compare
1571impl Hash for HydroIrMetadata {
1572    fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576    fn eq(&self, _: &Self) -> bool {
1577        true
1578    }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585        f.debug_struct("HydroIrMetadata")
1586            .field("location_id", &self.location_id)
1587            .field("collection_kind", &self.collection_kind)
1588            .finish()
1589    }
1590}
1591
1592/// Metadata that is specific to the operator itself, rather than its outputs.
1593/// This is available on _both_ inner nodes and roots.
1594#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596    pub backtrace: Backtrace,
1597    pub cpu_usage: Option<f64>,
1598    pub network_recv_cpu_usage: Option<f64>,
1599    pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603    #[expect(
1604        clippy::new_without_default,
1605        reason = "explicit calls to new ensure correct backtrace bounds"
1606    )]
1607    pub fn new() -> HydroIrOpMetadata {
1608        Self::new_with_skip(1)
1609    }
1610
1611    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612        HydroIrOpMetadata {
1613            backtrace: Backtrace::get_backtrace(2 + skip_count),
1614            cpu_usage: None,
1615            network_recv_cpu_usage: None,
1616            id: None,
1617        }
1618    }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623        f.debug_struct("HydroIrOpMetadata").finish()
1624    }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628    fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631/// An intermediate node in a Hydro graph, which consumes data
1632/// from upstream nodes and emits data to downstream nodes.
1633#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635    Placeholder,
1636
1637    /// Manually "casts" between two different collection kinds.
1638    ///
1639    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1640    /// correctness checks. In particular, the user must ensure that every possible
1641    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1642    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1643    /// collection. This ensures that the simulator does not miss any possible outputs.
1644    Cast {
1645        inner: Box<HydroNode>,
1646        metadata: HydroIrMetadata,
1647    },
1648
1649    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1650    /// interpretation of the input stream.
1651    ///
1652    /// In production, this simply passes through the input, but in simulation, this operator
1653    /// explicitly selects a randomized interpretation.
1654    ObserveNonDet {
1655        inner: Box<HydroNode>,
1656        trusted: bool, // if true, we do not need to simulate non-determinism
1657        metadata: HydroIrMetadata,
1658    },
1659
1660    Source {
1661        source: HydroSource,
1662        metadata: HydroIrMetadata,
1663    },
1664
1665    SingletonSource {
1666        value: DebugExpr,
1667        metadata: HydroIrMetadata,
1668    },
1669
1670    CycleSource {
1671        cycle_id: CycleId,
1672        metadata: HydroIrMetadata,
1673    },
1674
1675    Tee {
1676        inner: TeeNode,
1677        metadata: HydroIrMetadata,
1678    },
1679
1680    BeginAtomic {
1681        inner: Box<HydroNode>,
1682        metadata: HydroIrMetadata,
1683    },
1684
1685    EndAtomic {
1686        inner: Box<HydroNode>,
1687        metadata: HydroIrMetadata,
1688    },
1689
1690    Batch {
1691        inner: Box<HydroNode>,
1692        metadata: HydroIrMetadata,
1693    },
1694
1695    YieldConcat {
1696        inner: Box<HydroNode>,
1697        metadata: HydroIrMetadata,
1698    },
1699
1700    Chain {
1701        first: Box<HydroNode>,
1702        second: Box<HydroNode>,
1703        metadata: HydroIrMetadata,
1704    },
1705
1706    ChainFirst {
1707        first: Box<HydroNode>,
1708        second: Box<HydroNode>,
1709        metadata: HydroIrMetadata,
1710    },
1711
1712    CrossProduct {
1713        left: Box<HydroNode>,
1714        right: Box<HydroNode>,
1715        metadata: HydroIrMetadata,
1716    },
1717
1718    CrossSingleton {
1719        left: Box<HydroNode>,
1720        right: Box<HydroNode>,
1721        metadata: HydroIrMetadata,
1722    },
1723
1724    Join {
1725        left: Box<HydroNode>,
1726        right: Box<HydroNode>,
1727        metadata: HydroIrMetadata,
1728    },
1729
1730    Difference {
1731        pos: Box<HydroNode>,
1732        neg: Box<HydroNode>,
1733        metadata: HydroIrMetadata,
1734    },
1735
1736    AntiJoin {
1737        pos: Box<HydroNode>,
1738        neg: Box<HydroNode>,
1739        metadata: HydroIrMetadata,
1740    },
1741
1742    ResolveFutures {
1743        input: Box<HydroNode>,
1744        metadata: HydroIrMetadata,
1745    },
1746    ResolveFuturesOrdered {
1747        input: Box<HydroNode>,
1748        metadata: HydroIrMetadata,
1749    },
1750
1751    Map {
1752        f: DebugExpr,
1753        input: Box<HydroNode>,
1754        metadata: HydroIrMetadata,
1755    },
1756    FlatMap {
1757        f: DebugExpr,
1758        input: Box<HydroNode>,
1759        metadata: HydroIrMetadata,
1760    },
1761    Filter {
1762        f: DebugExpr,
1763        input: Box<HydroNode>,
1764        metadata: HydroIrMetadata,
1765    },
1766    FilterMap {
1767        f: DebugExpr,
1768        input: Box<HydroNode>,
1769        metadata: HydroIrMetadata,
1770    },
1771
1772    DeferTick {
1773        input: Box<HydroNode>,
1774        metadata: HydroIrMetadata,
1775    },
1776    Enumerate {
1777        input: Box<HydroNode>,
1778        metadata: HydroIrMetadata,
1779    },
1780    Inspect {
1781        f: DebugExpr,
1782        input: Box<HydroNode>,
1783        metadata: HydroIrMetadata,
1784    },
1785
1786    Unique {
1787        input: Box<HydroNode>,
1788        metadata: HydroIrMetadata,
1789    },
1790
1791    Sort {
1792        input: Box<HydroNode>,
1793        metadata: HydroIrMetadata,
1794    },
1795    Fold {
1796        init: DebugExpr,
1797        acc: DebugExpr,
1798        input: Box<HydroNode>,
1799        metadata: HydroIrMetadata,
1800    },
1801
1802    Scan {
1803        init: DebugExpr,
1804        acc: DebugExpr,
1805        input: Box<HydroNode>,
1806        metadata: HydroIrMetadata,
1807    },
1808    FoldKeyed {
1809        init: DebugExpr,
1810        acc: DebugExpr,
1811        input: Box<HydroNode>,
1812        metadata: HydroIrMetadata,
1813    },
1814
1815    Reduce {
1816        f: DebugExpr,
1817        input: Box<HydroNode>,
1818        metadata: HydroIrMetadata,
1819    },
1820    ReduceKeyed {
1821        f: DebugExpr,
1822        input: Box<HydroNode>,
1823        metadata: HydroIrMetadata,
1824    },
1825    ReduceKeyedWatermark {
1826        f: DebugExpr,
1827        input: Box<HydroNode>,
1828        watermark: Box<HydroNode>,
1829        metadata: HydroIrMetadata,
1830    },
1831
1832    Network {
1833        name: Option<String>,
1834        networking_info: crate::networking::NetworkingInfo,
1835        serialize_fn: Option<DebugExpr>,
1836        instantiate_fn: DebugInstantiate,
1837        deserialize_fn: Option<DebugExpr>,
1838        input: Box<HydroNode>,
1839        metadata: HydroIrMetadata,
1840    },
1841
1842    ExternalInput {
1843        from_external_key: LocationKey,
1844        from_port_id: ExternalPortId,
1845        from_many: bool,
1846        codec_type: DebugType,
1847        port_hint: NetworkHint,
1848        instantiate_fn: DebugInstantiate,
1849        deserialize_fn: Option<DebugExpr>,
1850        metadata: HydroIrMetadata,
1851    },
1852
1853    Counter {
1854        tag: String,
1855        duration: DebugExpr,
1856        prefix: String,
1857        input: Box<HydroNode>,
1858        metadata: HydroIrMetadata,
1859    },
1860}
1861
1862pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1863pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1864
1865impl HydroNode {
1866    pub fn transform_bottom_up(
1867        &mut self,
1868        transform: &mut impl FnMut(&mut HydroNode),
1869        seen_tees: &mut SeenTees,
1870        check_well_formed: bool,
1871    ) {
1872        self.transform_children(
1873            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1874            seen_tees,
1875        );
1876
1877        transform(self);
1878
1879        let self_location = self.metadata().location_id.root();
1880
1881        if check_well_formed {
1882            match &*self {
1883                HydroNode::Network { .. } => {}
1884                _ => {
1885                    self.input_metadata().iter().for_each(|i| {
1886                        if i.location_id.root() != self_location {
1887                            panic!(
1888                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1889                                i,
1890                                i.location_id.root(),
1891                                self,
1892                                self_location
1893                            )
1894                        }
1895                    });
1896                }
1897            }
1898        }
1899    }
1900
1901    #[inline(always)]
1902    pub fn transform_children(
1903        &mut self,
1904        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1905        seen_tees: &mut SeenTees,
1906    ) {
1907        match self {
1908            HydroNode::Placeholder => {
1909                panic!();
1910            }
1911
1912            HydroNode::Source { .. }
1913            | HydroNode::SingletonSource { .. }
1914            | HydroNode::CycleSource { .. }
1915            | HydroNode::ExternalInput { .. } => {}
1916
1917            HydroNode::Tee { inner, .. } => {
1918                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1919                    *inner = TeeNode(transformed.clone());
1920                } else {
1921                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1922                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1923                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1924                    transform(&mut orig, seen_tees);
1925                    *transformed_cell.borrow_mut() = orig;
1926                    *inner = TeeNode(transformed_cell);
1927                }
1928            }
1929
1930            HydroNode::Cast { inner, .. }
1931            | HydroNode::ObserveNonDet { inner, .. }
1932            | HydroNode::BeginAtomic { inner, .. }
1933            | HydroNode::EndAtomic { inner, .. }
1934            | HydroNode::Batch { inner, .. }
1935            | HydroNode::YieldConcat { inner, .. } => {
1936                transform(inner.as_mut(), seen_tees);
1937            }
1938
1939            HydroNode::Chain { first, second, .. } => {
1940                transform(first.as_mut(), seen_tees);
1941                transform(second.as_mut(), seen_tees);
1942            }
1943
1944            HydroNode::ChainFirst { first, second, .. } => {
1945                transform(first.as_mut(), seen_tees);
1946                transform(second.as_mut(), seen_tees);
1947            }
1948
1949            HydroNode::CrossSingleton { left, right, .. }
1950            | HydroNode::CrossProduct { left, right, .. }
1951            | HydroNode::Join { left, right, .. } => {
1952                transform(left.as_mut(), seen_tees);
1953                transform(right.as_mut(), seen_tees);
1954            }
1955
1956            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1957                transform(pos.as_mut(), seen_tees);
1958                transform(neg.as_mut(), seen_tees);
1959            }
1960
1961            HydroNode::ReduceKeyedWatermark {
1962                input, watermark, ..
1963            } => {
1964                transform(input.as_mut(), seen_tees);
1965                transform(watermark.as_mut(), seen_tees);
1966            }
1967
1968            HydroNode::Map { input, .. }
1969            | HydroNode::ResolveFutures { input, .. }
1970            | HydroNode::ResolveFuturesOrdered { input, .. }
1971            | HydroNode::FlatMap { input, .. }
1972            | HydroNode::Filter { input, .. }
1973            | HydroNode::FilterMap { input, .. }
1974            | HydroNode::Sort { input, .. }
1975            | HydroNode::DeferTick { input, .. }
1976            | HydroNode::Enumerate { input, .. }
1977            | HydroNode::Inspect { input, .. }
1978            | HydroNode::Unique { input, .. }
1979            | HydroNode::Network { input, .. }
1980            | HydroNode::Fold { input, .. }
1981            | HydroNode::Scan { input, .. }
1982            | HydroNode::FoldKeyed { input, .. }
1983            | HydroNode::Reduce { input, .. }
1984            | HydroNode::ReduceKeyed { input, .. }
1985            | HydroNode::Counter { input, .. } => {
1986                transform(input.as_mut(), seen_tees);
1987            }
1988        }
1989    }
1990
1991    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1992        match self {
1993            HydroNode::Placeholder => HydroNode::Placeholder,
1994            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1995                inner: Box::new(inner.deep_clone(seen_tees)),
1996                metadata: metadata.clone(),
1997            },
1998            HydroNode::ObserveNonDet {
1999                inner,
2000                trusted,
2001                metadata,
2002            } => HydroNode::ObserveNonDet {
2003                inner: Box::new(inner.deep_clone(seen_tees)),
2004                trusted: *trusted,
2005                metadata: metadata.clone(),
2006            },
2007            HydroNode::Source { source, metadata } => HydroNode::Source {
2008                source: source.clone(),
2009                metadata: metadata.clone(),
2010            },
2011            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
2012                value: value.clone(),
2013                metadata: metadata.clone(),
2014            },
2015            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2016                cycle_id: *cycle_id,
2017                metadata: metadata.clone(),
2018            },
2019            HydroNode::Tee { inner, metadata } => {
2020                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2021                    HydroNode::Tee {
2022                        inner: TeeNode(transformed.clone()),
2023                        metadata: metadata.clone(),
2024                    }
2025                } else {
2026                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2027                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2028                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2029                    *new_rc.borrow_mut() = cloned;
2030                    HydroNode::Tee {
2031                        inner: TeeNode(new_rc),
2032                        metadata: metadata.clone(),
2033                    }
2034                }
2035            }
2036            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2037                inner: Box::new(inner.deep_clone(seen_tees)),
2038                metadata: metadata.clone(),
2039            },
2040            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2041                inner: Box::new(inner.deep_clone(seen_tees)),
2042                metadata: metadata.clone(),
2043            },
2044            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2045                inner: Box::new(inner.deep_clone(seen_tees)),
2046                metadata: metadata.clone(),
2047            },
2048            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2049                inner: Box::new(inner.deep_clone(seen_tees)),
2050                metadata: metadata.clone(),
2051            },
2052            HydroNode::Chain {
2053                first,
2054                second,
2055                metadata,
2056            } => HydroNode::Chain {
2057                first: Box::new(first.deep_clone(seen_tees)),
2058                second: Box::new(second.deep_clone(seen_tees)),
2059                metadata: metadata.clone(),
2060            },
2061            HydroNode::ChainFirst {
2062                first,
2063                second,
2064                metadata,
2065            } => HydroNode::ChainFirst {
2066                first: Box::new(first.deep_clone(seen_tees)),
2067                second: Box::new(second.deep_clone(seen_tees)),
2068                metadata: metadata.clone(),
2069            },
2070            HydroNode::CrossProduct {
2071                left,
2072                right,
2073                metadata,
2074            } => HydroNode::CrossProduct {
2075                left: Box::new(left.deep_clone(seen_tees)),
2076                right: Box::new(right.deep_clone(seen_tees)),
2077                metadata: metadata.clone(),
2078            },
2079            HydroNode::CrossSingleton {
2080                left,
2081                right,
2082                metadata,
2083            } => HydroNode::CrossSingleton {
2084                left: Box::new(left.deep_clone(seen_tees)),
2085                right: Box::new(right.deep_clone(seen_tees)),
2086                metadata: metadata.clone(),
2087            },
2088            HydroNode::Join {
2089                left,
2090                right,
2091                metadata,
2092            } => HydroNode::Join {
2093                left: Box::new(left.deep_clone(seen_tees)),
2094                right: Box::new(right.deep_clone(seen_tees)),
2095                metadata: metadata.clone(),
2096            },
2097            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2098                pos: Box::new(pos.deep_clone(seen_tees)),
2099                neg: Box::new(neg.deep_clone(seen_tees)),
2100                metadata: metadata.clone(),
2101            },
2102            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2103                pos: Box::new(pos.deep_clone(seen_tees)),
2104                neg: Box::new(neg.deep_clone(seen_tees)),
2105                metadata: metadata.clone(),
2106            },
2107            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2108                input: Box::new(input.deep_clone(seen_tees)),
2109                metadata: metadata.clone(),
2110            },
2111            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2112                HydroNode::ResolveFuturesOrdered {
2113                    input: Box::new(input.deep_clone(seen_tees)),
2114                    metadata: metadata.clone(),
2115                }
2116            }
2117            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2118                f: f.clone(),
2119                input: Box::new(input.deep_clone(seen_tees)),
2120                metadata: metadata.clone(),
2121            },
2122            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2123                f: f.clone(),
2124                input: Box::new(input.deep_clone(seen_tees)),
2125                metadata: metadata.clone(),
2126            },
2127            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2128                f: f.clone(),
2129                input: Box::new(input.deep_clone(seen_tees)),
2130                metadata: metadata.clone(),
2131            },
2132            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2133                f: f.clone(),
2134                input: Box::new(input.deep_clone(seen_tees)),
2135                metadata: metadata.clone(),
2136            },
2137            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2138                input: Box::new(input.deep_clone(seen_tees)),
2139                metadata: metadata.clone(),
2140            },
2141            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2142                input: Box::new(input.deep_clone(seen_tees)),
2143                metadata: metadata.clone(),
2144            },
2145            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2146                f: f.clone(),
2147                input: Box::new(input.deep_clone(seen_tees)),
2148                metadata: metadata.clone(),
2149            },
2150            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2151                input: Box::new(input.deep_clone(seen_tees)),
2152                metadata: metadata.clone(),
2153            },
2154            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2155                input: Box::new(input.deep_clone(seen_tees)),
2156                metadata: metadata.clone(),
2157            },
2158            HydroNode::Fold {
2159                init,
2160                acc,
2161                input,
2162                metadata,
2163            } => HydroNode::Fold {
2164                init: init.clone(),
2165                acc: acc.clone(),
2166                input: Box::new(input.deep_clone(seen_tees)),
2167                metadata: metadata.clone(),
2168            },
2169            HydroNode::Scan {
2170                init,
2171                acc,
2172                input,
2173                metadata,
2174            } => HydroNode::Scan {
2175                init: init.clone(),
2176                acc: acc.clone(),
2177                input: Box::new(input.deep_clone(seen_tees)),
2178                metadata: metadata.clone(),
2179            },
2180            HydroNode::FoldKeyed {
2181                init,
2182                acc,
2183                input,
2184                metadata,
2185            } => HydroNode::FoldKeyed {
2186                init: init.clone(),
2187                acc: acc.clone(),
2188                input: Box::new(input.deep_clone(seen_tees)),
2189                metadata: metadata.clone(),
2190            },
2191            HydroNode::ReduceKeyedWatermark {
2192                f,
2193                input,
2194                watermark,
2195                metadata,
2196            } => HydroNode::ReduceKeyedWatermark {
2197                f: f.clone(),
2198                input: Box::new(input.deep_clone(seen_tees)),
2199                watermark: Box::new(watermark.deep_clone(seen_tees)),
2200                metadata: metadata.clone(),
2201            },
2202            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2203                f: f.clone(),
2204                input: Box::new(input.deep_clone(seen_tees)),
2205                metadata: metadata.clone(),
2206            },
2207            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2208                f: f.clone(),
2209                input: Box::new(input.deep_clone(seen_tees)),
2210                metadata: metadata.clone(),
2211            },
2212            HydroNode::Network {
2213                name,
2214                networking_info,
2215                serialize_fn,
2216                instantiate_fn,
2217                deserialize_fn,
2218                input,
2219                metadata,
2220            } => HydroNode::Network {
2221                name: name.clone(),
2222                networking_info: networking_info.clone(),
2223                serialize_fn: serialize_fn.clone(),
2224                instantiate_fn: instantiate_fn.clone(),
2225                deserialize_fn: deserialize_fn.clone(),
2226                input: Box::new(input.deep_clone(seen_tees)),
2227                metadata: metadata.clone(),
2228            },
2229            HydroNode::ExternalInput {
2230                from_external_key,
2231                from_port_id,
2232                from_many,
2233                codec_type,
2234                port_hint,
2235                instantiate_fn,
2236                deserialize_fn,
2237                metadata,
2238            } => HydroNode::ExternalInput {
2239                from_external_key: *from_external_key,
2240                from_port_id: *from_port_id,
2241                from_many: *from_many,
2242                codec_type: codec_type.clone(),
2243                port_hint: *port_hint,
2244                instantiate_fn: instantiate_fn.clone(),
2245                deserialize_fn: deserialize_fn.clone(),
2246                metadata: metadata.clone(),
2247            },
2248            HydroNode::Counter {
2249                tag,
2250                duration,
2251                prefix,
2252                input,
2253                metadata,
2254            } => HydroNode::Counter {
2255                tag: tag.clone(),
2256                duration: duration.clone(),
2257                prefix: prefix.clone(),
2258                input: Box::new(input.deep_clone(seen_tees)),
2259                metadata: metadata.clone(),
2260            },
2261        }
2262    }
2263
2264    #[cfg(feature = "build")]
2265    pub fn emit_core(
2266        &mut self,
2267        builders_or_callback: &mut BuildersOrCallback<
2268            impl FnMut(&mut HydroRoot, &mut usize),
2269            impl FnMut(&mut HydroNode, &mut usize),
2270        >,
2271        seen_tees: &mut SeenTees,
2272        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2273        next_stmt_id: &mut usize,
2274    ) -> syn::Ident {
2275        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2276
2277        self.transform_bottom_up(
2278            &mut |node: &mut HydroNode| {
2279                let out_location = node.metadata().location_id.clone();
2280                match node {
2281                    HydroNode::Placeholder => {
2282                        panic!()
2283                    }
2284
2285                    HydroNode::Cast { .. } => {
2286                        // Cast passes through the input ident unchanged
2287                        // The input ident is already on the stack from processing the child
2288                        match builders_or_callback {
2289                            BuildersOrCallback::Builders(_) => {}
2290                            BuildersOrCallback::Callback(_, node_callback) => {
2291                                node_callback(node, next_stmt_id);
2292                            }
2293                        }
2294
2295                        *next_stmt_id += 1;
2296                        // input_ident stays on stack as output
2297                    }
2298
2299                    HydroNode::ObserveNonDet {
2300                        inner,
2301                        trusted,
2302                        metadata,
2303                        ..
2304                    } => {
2305                        let inner_ident = ident_stack.pop().unwrap();
2306
2307                        let observe_ident =
2308                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2309
2310                        match builders_or_callback {
2311                            BuildersOrCallback::Builders(graph_builders) => {
2312                                graph_builders.observe_nondet(
2313                                    *trusted,
2314                                    &inner.metadata().location_id,
2315                                    inner_ident,
2316                                    &inner.metadata().collection_kind,
2317                                    &observe_ident,
2318                                    &metadata.collection_kind,
2319                                    &metadata.op,
2320                                );
2321                            }
2322                            BuildersOrCallback::Callback(_, node_callback) => {
2323                                node_callback(node, next_stmt_id);
2324                            }
2325                        }
2326
2327                        *next_stmt_id += 1;
2328
2329                        ident_stack.push(observe_ident);
2330                    }
2331
2332                    HydroNode::Batch {
2333                        inner, metadata, ..
2334                    } => {
2335                        let inner_ident = ident_stack.pop().unwrap();
2336
2337                        let batch_ident =
2338                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2339
2340                        match builders_or_callback {
2341                            BuildersOrCallback::Builders(graph_builders) => {
2342                                graph_builders.batch(
2343                                    inner_ident,
2344                                    &inner.metadata().location_id,
2345                                    &inner.metadata().collection_kind,
2346                                    &batch_ident,
2347                                    &out_location,
2348                                    &metadata.op,
2349                                );
2350                            }
2351                            BuildersOrCallback::Callback(_, node_callback) => {
2352                                node_callback(node, next_stmt_id);
2353                            }
2354                        }
2355
2356                        *next_stmt_id += 1;
2357
2358                        ident_stack.push(batch_ident);
2359                    }
2360
2361                    HydroNode::YieldConcat { inner, .. } => {
2362                        let inner_ident = ident_stack.pop().unwrap();
2363
2364                        let yield_ident =
2365                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2366
2367                        match builders_or_callback {
2368                            BuildersOrCallback::Builders(graph_builders) => {
2369                                graph_builders.yield_from_tick(
2370                                    inner_ident,
2371                                    &inner.metadata().location_id,
2372                                    &inner.metadata().collection_kind,
2373                                    &yield_ident,
2374                                    &out_location,
2375                                );
2376                            }
2377                            BuildersOrCallback::Callback(_, node_callback) => {
2378                                node_callback(node, next_stmt_id);
2379                            }
2380                        }
2381
2382                        *next_stmt_id += 1;
2383
2384                        ident_stack.push(yield_ident);
2385                    }
2386
2387                    HydroNode::BeginAtomic { inner, metadata } => {
2388                        let inner_ident = ident_stack.pop().unwrap();
2389
2390                        let begin_ident =
2391                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2392
2393                        match builders_or_callback {
2394                            BuildersOrCallback::Builders(graph_builders) => {
2395                                graph_builders.begin_atomic(
2396                                    inner_ident,
2397                                    &inner.metadata().location_id,
2398                                    &inner.metadata().collection_kind,
2399                                    &begin_ident,
2400                                    &out_location,
2401                                    &metadata.op,
2402                                );
2403                            }
2404                            BuildersOrCallback::Callback(_, node_callback) => {
2405                                node_callback(node, next_stmt_id);
2406                            }
2407                        }
2408
2409                        *next_stmt_id += 1;
2410
2411                        ident_stack.push(begin_ident);
2412                    }
2413
2414                    HydroNode::EndAtomic { inner, .. } => {
2415                        let inner_ident = ident_stack.pop().unwrap();
2416
2417                        let end_ident =
2418                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2419
2420                        match builders_or_callback {
2421                            BuildersOrCallback::Builders(graph_builders) => {
2422                                graph_builders.end_atomic(
2423                                    inner_ident,
2424                                    &inner.metadata().location_id,
2425                                    &inner.metadata().collection_kind,
2426                                    &end_ident,
2427                                );
2428                            }
2429                            BuildersOrCallback::Callback(_, node_callback) => {
2430                                node_callback(node, next_stmt_id);
2431                            }
2432                        }
2433
2434                        *next_stmt_id += 1;
2435
2436                        ident_stack.push(end_ident);
2437                    }
2438
2439                    HydroNode::Source {
2440                        source, metadata, ..
2441                    } => {
2442                        if let HydroSource::ExternalNetwork() = source {
2443                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2444                        } else {
2445                            let source_ident =
2446                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2447
2448                            let source_stmt = match source {
2449                                HydroSource::Stream(expr) => {
2450                                    debug_assert!(metadata.location_id.is_top_level());
2451                                    parse_quote! {
2452                                        #source_ident = source_stream(#expr);
2453                                    }
2454                                }
2455
2456                                HydroSource::ExternalNetwork() => {
2457                                    unreachable!()
2458                                }
2459
2460                                HydroSource::Iter(expr) => {
2461                                    if metadata.location_id.is_top_level() {
2462                                        parse_quote! {
2463                                            #source_ident = source_iter(#expr);
2464                                        }
2465                                    } else {
2466                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2467                                        parse_quote! {
2468                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2469                                        }
2470                                    }
2471                                }
2472
2473                                HydroSource::Spin() => {
2474                                    debug_assert!(metadata.location_id.is_top_level());
2475                                    parse_quote! {
2476                                        #source_ident = spin();
2477                                    }
2478                                }
2479
2480                                HydroSource::ClusterMembers(target_loc, state) => {
2481                                    debug_assert!(metadata.location_id.is_top_level());
2482
2483                                    let members_tee_ident = syn::Ident::new(
2484                                        &format!(
2485                                            "__cluster_members_tee_{}_{}",
2486                                            metadata.location_id.root().key(),
2487                                            target_loc.key(),
2488                                        ),
2489                                        Span::call_site(),
2490                                    );
2491
2492                                    match state {
2493                                        ClusterMembersState::Stream(d) => {
2494                                            parse_quote! {
2495                                                #members_tee_ident = source_stream(#d) -> tee();
2496                                                #source_ident = #members_tee_ident;
2497                                            }
2498                                        },
2499                                        ClusterMembersState::Uninit => syn::parse_quote! {
2500                                            #source_ident = source_stream(DUMMY);
2501                                        },
2502                                        ClusterMembersState::Tee(..) => parse_quote! {
2503                                            #source_ident = #members_tee_ident;
2504                                        },
2505                                    }
2506                                }
2507
2508                                HydroSource::Embedded(ident) => {
2509                                    parse_quote! {
2510                                        #source_ident = source_stream(#ident);
2511                                    }
2512                                }
2513                            };
2514
2515                            match builders_or_callback {
2516                                BuildersOrCallback::Builders(graph_builders) => {
2517                                    let builder = graph_builders.get_dfir_mut(&out_location);
2518                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2519                                }
2520                                BuildersOrCallback::Callback(_, node_callback) => {
2521                                    node_callback(node, next_stmt_id);
2522                                }
2523                            }
2524
2525                            *next_stmt_id += 1;
2526
2527                            ident_stack.push(source_ident);
2528                        }
2529                    }
2530
2531                    HydroNode::SingletonSource { value, metadata } => {
2532                        let source_ident =
2533                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535                        match builders_or_callback {
2536                            BuildersOrCallback::Builders(graph_builders) => {
2537                                let builder = graph_builders.get_dfir_mut(&out_location);
2538
2539                                if metadata.location_id.is_top_level()
2540                                    && metadata.collection_kind.is_bounded()
2541                                {
2542                                    builder.add_dfir(
2543                                        parse_quote! {
2544                                            #source_ident = source_iter([#value]);
2545                                        },
2546                                        None,
2547                                        Some(&next_stmt_id.to_string()),
2548                                    );
2549                                } else {
2550                                    builder.add_dfir(
2551                                        parse_quote! {
2552                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2553                                        },
2554                                        None,
2555                                        Some(&next_stmt_id.to_string()),
2556                                    );
2557                                }
2558                            }
2559                            BuildersOrCallback::Callback(_, node_callback) => {
2560                                node_callback(node, next_stmt_id);
2561                            }
2562                        }
2563
2564                        *next_stmt_id += 1;
2565
2566                        ident_stack.push(source_ident);
2567                    }
2568
2569                    HydroNode::CycleSource { cycle_id, .. } => {
2570                        let ident = cycle_id.as_ident();
2571
2572                        match builders_or_callback {
2573                            BuildersOrCallback::Builders(_) => {}
2574                            BuildersOrCallback::Callback(_, node_callback) => {
2575                                node_callback(node, next_stmt_id);
2576                            }
2577                        }
2578
2579                        // consume a stmt id even though we did not emit anything so that we can instrument this
2580                        *next_stmt_id += 1;
2581
2582                        ident_stack.push(ident);
2583                    }
2584
2585                    HydroNode::Tee { inner, .. } => {
2586                        let ret_ident = if let Some(teed_from) =
2587                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2588                        {
2589                            match builders_or_callback {
2590                                BuildersOrCallback::Builders(_) => {}
2591                                BuildersOrCallback::Callback(_, node_callback) => {
2592                                    node_callback(node, next_stmt_id);
2593                                }
2594                            }
2595
2596                            teed_from.clone()
2597                        } else {
2598                            // The inner node was already processed by transform_bottom_up,
2599                            // so its ident is on the stack
2600                            let inner_ident = ident_stack.pop().unwrap();
2601
2602                            let tee_ident =
2603                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2604
2605                            built_tees.insert(
2606                                inner.0.as_ref() as *const RefCell<HydroNode>,
2607                                tee_ident.clone(),
2608                            );
2609
2610                            match builders_or_callback {
2611                                BuildersOrCallback::Builders(graph_builders) => {
2612                                    let builder = graph_builders.get_dfir_mut(&out_location);
2613                                    builder.add_dfir(
2614                                        parse_quote! {
2615                                            #tee_ident = #inner_ident -> tee();
2616                                        },
2617                                        None,
2618                                        Some(&next_stmt_id.to_string()),
2619                                    );
2620                                }
2621                                BuildersOrCallback::Callback(_, node_callback) => {
2622                                    node_callback(node, next_stmt_id);
2623                                }
2624                            }
2625
2626                            tee_ident
2627                        };
2628
2629                        // we consume a stmt id regardless of if we emit the tee() operator,
2630                        // so that during rewrites we touch all recipients of the tee()
2631
2632                        *next_stmt_id += 1;
2633                        ident_stack.push(ret_ident);
2634                    }
2635
2636                    HydroNode::Chain { .. } => {
2637                        // Children are processed left-to-right, so second is on top
2638                        let second_ident = ident_stack.pop().unwrap();
2639                        let first_ident = ident_stack.pop().unwrap();
2640
2641                        let chain_ident =
2642                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2643
2644                        match builders_or_callback {
2645                            BuildersOrCallback::Builders(graph_builders) => {
2646                                let builder = graph_builders.get_dfir_mut(&out_location);
2647                                builder.add_dfir(
2648                                    parse_quote! {
2649                                        #chain_ident = chain();
2650                                        #first_ident -> [0]#chain_ident;
2651                                        #second_ident -> [1]#chain_ident;
2652                                    },
2653                                    None,
2654                                    Some(&next_stmt_id.to_string()),
2655                                );
2656                            }
2657                            BuildersOrCallback::Callback(_, node_callback) => {
2658                                node_callback(node, next_stmt_id);
2659                            }
2660                        }
2661
2662                        *next_stmt_id += 1;
2663
2664                        ident_stack.push(chain_ident);
2665                    }
2666
2667                    HydroNode::ChainFirst { .. } => {
2668                        let second_ident = ident_stack.pop().unwrap();
2669                        let first_ident = ident_stack.pop().unwrap();
2670
2671                        let chain_ident =
2672                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2673
2674                        match builders_or_callback {
2675                            BuildersOrCallback::Builders(graph_builders) => {
2676                                let builder = graph_builders.get_dfir_mut(&out_location);
2677                                builder.add_dfir(
2678                                    parse_quote! {
2679                                        #chain_ident = chain_first_n(1);
2680                                        #first_ident -> [0]#chain_ident;
2681                                        #second_ident -> [1]#chain_ident;
2682                                    },
2683                                    None,
2684                                    Some(&next_stmt_id.to_string()),
2685                                );
2686                            }
2687                            BuildersOrCallback::Callback(_, node_callback) => {
2688                                node_callback(node, next_stmt_id);
2689                            }
2690                        }
2691
2692                        *next_stmt_id += 1;
2693
2694                        ident_stack.push(chain_ident);
2695                    }
2696
2697                    HydroNode::CrossSingleton { right, .. } => {
2698                        let right_ident = ident_stack.pop().unwrap();
2699                        let left_ident = ident_stack.pop().unwrap();
2700
2701                        let cross_ident =
2702                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2703
2704                        match builders_or_callback {
2705                            BuildersOrCallback::Builders(graph_builders) => {
2706                                let builder = graph_builders.get_dfir_mut(&out_location);
2707
2708                                if right.metadata().location_id.is_top_level()
2709                                    && right.metadata().collection_kind.is_bounded()
2710                                {
2711                                    builder.add_dfir(
2712                                        parse_quote! {
2713                                            #cross_ident = cross_singleton();
2714                                            #left_ident -> [input]#cross_ident;
2715                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2716                                        },
2717                                        None,
2718                                        Some(&next_stmt_id.to_string()),
2719                                    );
2720                                } else {
2721                                    builder.add_dfir(
2722                                        parse_quote! {
2723                                            #cross_ident = cross_singleton();
2724                                            #left_ident -> [input]#cross_ident;
2725                                            #right_ident -> [single]#cross_ident;
2726                                        },
2727                                        None,
2728                                        Some(&next_stmt_id.to_string()),
2729                                    );
2730                                }
2731                            }
2732                            BuildersOrCallback::Callback(_, node_callback) => {
2733                                node_callback(node, next_stmt_id);
2734                            }
2735                        }
2736
2737                        *next_stmt_id += 1;
2738
2739                        ident_stack.push(cross_ident);
2740                    }
2741
2742                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2743                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2744                            parse_quote!(cross_join_multiset)
2745                        } else {
2746                            parse_quote!(join_multiset)
2747                        };
2748
2749                        let (HydroNode::CrossProduct { left, right, .. }
2750                        | HydroNode::Join { left, right, .. }) = node
2751                        else {
2752                            unreachable!()
2753                        };
2754
2755                        let is_top_level = left.metadata().location_id.is_top_level()
2756                            && right.metadata().location_id.is_top_level();
2757                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2758                            quote!('static)
2759                        } else {
2760                            quote!('tick)
2761                        };
2762
2763                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2764                            quote!('static)
2765                        } else {
2766                            quote!('tick)
2767                        };
2768
2769                        let right_ident = ident_stack.pop().unwrap();
2770                        let left_ident = ident_stack.pop().unwrap();
2771
2772                        let stream_ident =
2773                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2774
2775                        match builders_or_callback {
2776                            BuildersOrCallback::Builders(graph_builders) => {
2777                                let builder = graph_builders.get_dfir_mut(&out_location);
2778                                builder.add_dfir(
2779                                    if is_top_level {
2780                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2781                                        // a multiset_delta() to negate the replay behavior
2782                                        parse_quote! {
2783                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2784                                            #left_ident -> [0]#stream_ident;
2785                                            #right_ident -> [1]#stream_ident;
2786                                        }
2787                                    } else {
2788                                        parse_quote! {
2789                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2790                                            #left_ident -> [0]#stream_ident;
2791                                            #right_ident -> [1]#stream_ident;
2792                                        }
2793                                    }
2794                                    ,
2795                                    None,
2796                                    Some(&next_stmt_id.to_string()),
2797                                );
2798                            }
2799                            BuildersOrCallback::Callback(_, node_callback) => {
2800                                node_callback(node, next_stmt_id);
2801                            }
2802                        }
2803
2804                        *next_stmt_id += 1;
2805
2806                        ident_stack.push(stream_ident);
2807                    }
2808
2809                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2810                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2811                            parse_quote!(difference)
2812                        } else {
2813                            parse_quote!(anti_join)
2814                        };
2815
2816                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2817                            node
2818                        else {
2819                            unreachable!()
2820                        };
2821
2822                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2823                            quote!('static)
2824                        } else {
2825                            quote!('tick)
2826                        };
2827
2828                        let neg_ident = ident_stack.pop().unwrap();
2829                        let pos_ident = ident_stack.pop().unwrap();
2830
2831                        let stream_ident =
2832                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2833
2834                        match builders_or_callback {
2835                            BuildersOrCallback::Builders(graph_builders) => {
2836                                let builder = graph_builders.get_dfir_mut(&out_location);
2837                                builder.add_dfir(
2838                                    parse_quote! {
2839                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2840                                        #pos_ident -> [pos]#stream_ident;
2841                                        #neg_ident -> [neg]#stream_ident;
2842                                    },
2843                                    None,
2844                                    Some(&next_stmt_id.to_string()),
2845                                );
2846                            }
2847                            BuildersOrCallback::Callback(_, node_callback) => {
2848                                node_callback(node, next_stmt_id);
2849                            }
2850                        }
2851
2852                        *next_stmt_id += 1;
2853
2854                        ident_stack.push(stream_ident);
2855                    }
2856
2857                    HydroNode::ResolveFutures { .. } => {
2858                        let input_ident = ident_stack.pop().unwrap();
2859
2860                        let futures_ident =
2861                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2862
2863                        match builders_or_callback {
2864                            BuildersOrCallback::Builders(graph_builders) => {
2865                                let builder = graph_builders.get_dfir_mut(&out_location);
2866                                builder.add_dfir(
2867                                    parse_quote! {
2868                                        #futures_ident = #input_ident -> resolve_futures();
2869                                    },
2870                                    None,
2871                                    Some(&next_stmt_id.to_string()),
2872                                );
2873                            }
2874                            BuildersOrCallback::Callback(_, node_callback) => {
2875                                node_callback(node, next_stmt_id);
2876                            }
2877                        }
2878
2879                        *next_stmt_id += 1;
2880
2881                        ident_stack.push(futures_ident);
2882                    }
2883
2884                    HydroNode::ResolveFuturesOrdered { .. } => {
2885                        let input_ident = ident_stack.pop().unwrap();
2886
2887                        let futures_ident =
2888                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2889
2890                        match builders_or_callback {
2891                            BuildersOrCallback::Builders(graph_builders) => {
2892                                let builder = graph_builders.get_dfir_mut(&out_location);
2893                                builder.add_dfir(
2894                                    parse_quote! {
2895                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2896                                    },
2897                                    None,
2898                                    Some(&next_stmt_id.to_string()),
2899                                );
2900                            }
2901                            BuildersOrCallback::Callback(_, node_callback) => {
2902                                node_callback(node, next_stmt_id);
2903                            }
2904                        }
2905
2906                        *next_stmt_id += 1;
2907
2908                        ident_stack.push(futures_ident);
2909                    }
2910
2911                    HydroNode::Map { f, .. } => {
2912                        let input_ident = ident_stack.pop().unwrap();
2913
2914                        let map_ident =
2915                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2916
2917                        match builders_or_callback {
2918                            BuildersOrCallback::Builders(graph_builders) => {
2919                                let builder = graph_builders.get_dfir_mut(&out_location);
2920                                builder.add_dfir(
2921                                    parse_quote! {
2922                                        #map_ident = #input_ident -> map(#f);
2923                                    },
2924                                    None,
2925                                    Some(&next_stmt_id.to_string()),
2926                                );
2927                            }
2928                            BuildersOrCallback::Callback(_, node_callback) => {
2929                                node_callback(node, next_stmt_id);
2930                            }
2931                        }
2932
2933                        *next_stmt_id += 1;
2934
2935                        ident_stack.push(map_ident);
2936                    }
2937
2938                    HydroNode::FlatMap { f, .. } => {
2939                        let input_ident = ident_stack.pop().unwrap();
2940
2941                        let flat_map_ident =
2942                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2943
2944                        match builders_or_callback {
2945                            BuildersOrCallback::Builders(graph_builders) => {
2946                                let builder = graph_builders.get_dfir_mut(&out_location);
2947                                builder.add_dfir(
2948                                    parse_quote! {
2949                                        #flat_map_ident = #input_ident -> flat_map(#f);
2950                                    },
2951                                    None,
2952                                    Some(&next_stmt_id.to_string()),
2953                                );
2954                            }
2955                            BuildersOrCallback::Callback(_, node_callback) => {
2956                                node_callback(node, next_stmt_id);
2957                            }
2958                        }
2959
2960                        *next_stmt_id += 1;
2961
2962                        ident_stack.push(flat_map_ident);
2963                    }
2964
2965                    HydroNode::Filter { f, .. } => {
2966                        let input_ident = ident_stack.pop().unwrap();
2967
2968                        let filter_ident =
2969                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2970
2971                        match builders_or_callback {
2972                            BuildersOrCallback::Builders(graph_builders) => {
2973                                let builder = graph_builders.get_dfir_mut(&out_location);
2974                                builder.add_dfir(
2975                                    parse_quote! {
2976                                        #filter_ident = #input_ident -> filter(#f);
2977                                    },
2978                                    None,
2979                                    Some(&next_stmt_id.to_string()),
2980                                );
2981                            }
2982                            BuildersOrCallback::Callback(_, node_callback) => {
2983                                node_callback(node, next_stmt_id);
2984                            }
2985                        }
2986
2987                        *next_stmt_id += 1;
2988
2989                        ident_stack.push(filter_ident);
2990                    }
2991
2992                    HydroNode::FilterMap { f, .. } => {
2993                        let input_ident = ident_stack.pop().unwrap();
2994
2995                        let filter_map_ident =
2996                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2997
2998                        match builders_or_callback {
2999                            BuildersOrCallback::Builders(graph_builders) => {
3000                                let builder = graph_builders.get_dfir_mut(&out_location);
3001                                builder.add_dfir(
3002                                    parse_quote! {
3003                                        #filter_map_ident = #input_ident -> filter_map(#f);
3004                                    },
3005                                    None,
3006                                    Some(&next_stmt_id.to_string()),
3007                                );
3008                            }
3009                            BuildersOrCallback::Callback(_, node_callback) => {
3010                                node_callback(node, next_stmt_id);
3011                            }
3012                        }
3013
3014                        *next_stmt_id += 1;
3015
3016                        ident_stack.push(filter_map_ident);
3017                    }
3018
3019                    HydroNode::Sort { .. } => {
3020                        let input_ident = ident_stack.pop().unwrap();
3021
3022                        let sort_ident =
3023                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3024
3025                        match builders_or_callback {
3026                            BuildersOrCallback::Builders(graph_builders) => {
3027                                let builder = graph_builders.get_dfir_mut(&out_location);
3028                                builder.add_dfir(
3029                                    parse_quote! {
3030                                        #sort_ident = #input_ident -> sort();
3031                                    },
3032                                    None,
3033                                    Some(&next_stmt_id.to_string()),
3034                                );
3035                            }
3036                            BuildersOrCallback::Callback(_, node_callback) => {
3037                                node_callback(node, next_stmt_id);
3038                            }
3039                        }
3040
3041                        *next_stmt_id += 1;
3042
3043                        ident_stack.push(sort_ident);
3044                    }
3045
3046                    HydroNode::DeferTick { .. } => {
3047                        let input_ident = ident_stack.pop().unwrap();
3048
3049                        let defer_tick_ident =
3050                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3051
3052                        match builders_or_callback {
3053                            BuildersOrCallback::Builders(graph_builders) => {
3054                                let builder = graph_builders.get_dfir_mut(&out_location);
3055                                builder.add_dfir(
3056                                    parse_quote! {
3057                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3058                                    },
3059                                    None,
3060                                    Some(&next_stmt_id.to_string()),
3061                                );
3062                            }
3063                            BuildersOrCallback::Callback(_, node_callback) => {
3064                                node_callback(node, next_stmt_id);
3065                            }
3066                        }
3067
3068                        *next_stmt_id += 1;
3069
3070                        ident_stack.push(defer_tick_ident);
3071                    }
3072
3073                    HydroNode::Enumerate { input, .. } => {
3074                        let input_ident = ident_stack.pop().unwrap();
3075
3076                        let enumerate_ident =
3077                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3078
3079                        match builders_or_callback {
3080                            BuildersOrCallback::Builders(graph_builders) => {
3081                                let builder = graph_builders.get_dfir_mut(&out_location);
3082                                let lifetime = if input.metadata().location_id.is_top_level() {
3083                                    quote!('static)
3084                                } else {
3085                                    quote!('tick)
3086                                };
3087                                builder.add_dfir(
3088                                    parse_quote! {
3089                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3090                                    },
3091                                    None,
3092                                    Some(&next_stmt_id.to_string()),
3093                                );
3094                            }
3095                            BuildersOrCallback::Callback(_, node_callback) => {
3096                                node_callback(node, next_stmt_id);
3097                            }
3098                        }
3099
3100                        *next_stmt_id += 1;
3101
3102                        ident_stack.push(enumerate_ident);
3103                    }
3104
3105                    HydroNode::Inspect { f, .. } => {
3106                        let input_ident = ident_stack.pop().unwrap();
3107
3108                        let inspect_ident =
3109                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3110
3111                        match builders_or_callback {
3112                            BuildersOrCallback::Builders(graph_builders) => {
3113                                let builder = graph_builders.get_dfir_mut(&out_location);
3114                                builder.add_dfir(
3115                                    parse_quote! {
3116                                        #inspect_ident = #input_ident -> inspect(#f);
3117                                    },
3118                                    None,
3119                                    Some(&next_stmt_id.to_string()),
3120                                );
3121                            }
3122                            BuildersOrCallback::Callback(_, node_callback) => {
3123                                node_callback(node, next_stmt_id);
3124                            }
3125                        }
3126
3127                        *next_stmt_id += 1;
3128
3129                        ident_stack.push(inspect_ident);
3130                    }
3131
3132                    HydroNode::Unique { input, .. } => {
3133                        let input_ident = ident_stack.pop().unwrap();
3134
3135                        let unique_ident =
3136                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3137
3138                        match builders_or_callback {
3139                            BuildersOrCallback::Builders(graph_builders) => {
3140                                let builder = graph_builders.get_dfir_mut(&out_location);
3141                                let lifetime = if input.metadata().location_id.is_top_level() {
3142                                    quote!('static)
3143                                } else {
3144                                    quote!('tick)
3145                                };
3146
3147                                builder.add_dfir(
3148                                    parse_quote! {
3149                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3150                                    },
3151                                    None,
3152                                    Some(&next_stmt_id.to_string()),
3153                                );
3154                            }
3155                            BuildersOrCallback::Callback(_, node_callback) => {
3156                                node_callback(node, next_stmt_id);
3157                            }
3158                        }
3159
3160                        *next_stmt_id += 1;
3161
3162                        ident_stack.push(unique_ident);
3163                    }
3164
3165                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3166                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3167                            if input.metadata().location_id.is_top_level()
3168                                && input.metadata().collection_kind.is_bounded()
3169                            {
3170                                parse_quote!(fold_no_replay)
3171                            } else {
3172                                parse_quote!(fold)
3173                            }
3174                        } else if matches!(node, HydroNode::Scan { .. }) {
3175                            parse_quote!(scan)
3176                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3177                            if input.metadata().location_id.is_top_level()
3178                                && input.metadata().collection_kind.is_bounded()
3179                            {
3180                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3181                            } else {
3182                                parse_quote!(fold_keyed)
3183                            }
3184                        } else {
3185                            unreachable!()
3186                        };
3187
3188                        let (HydroNode::Fold { input, .. }
3189                        | HydroNode::FoldKeyed { input, .. }
3190                        | HydroNode::Scan { input, .. }) = node
3191                        else {
3192                            unreachable!()
3193                        };
3194
3195                        let lifetime = if input.metadata().location_id.is_top_level() {
3196                            quote!('static)
3197                        } else {
3198                            quote!('tick)
3199                        };
3200
3201                        let input_ident = ident_stack.pop().unwrap();
3202
3203                        let (HydroNode::Fold { init, acc, .. }
3204                        | HydroNode::FoldKeyed { init, acc, .. }
3205                        | HydroNode::Scan { init, acc, .. }) = &*node
3206                        else {
3207                            unreachable!()
3208                        };
3209
3210                        let fold_ident =
3211                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3212
3213                        match builders_or_callback {
3214                            BuildersOrCallback::Builders(graph_builders) => {
3215                                if matches!(node, HydroNode::Fold { .. })
3216                                    && node.metadata().location_id.is_top_level()
3217                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3218                                    && graph_builders.singleton_intermediates()
3219                                    && !node.metadata().collection_kind.is_bounded()
3220                                {
3221                                    let builder = graph_builders.get_dfir_mut(&out_location);
3222
3223                                    let acc: syn::Expr = parse_quote!({
3224                                        let mut __inner = #acc;
3225                                        move |__state, __value| {
3226                                            __inner(__state, __value);
3227                                            Some(__state.clone())
3228                                        }
3229                                    });
3230
3231                                    builder.add_dfir(
3232                                        parse_quote! {
3233                                            source_iter([(#init)()]) -> [0]#fold_ident;
3234                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3235                                            #fold_ident = chain();
3236                                        },
3237                                        None,
3238                                        Some(&next_stmt_id.to_string()),
3239                                    );
3240                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3241                                    && node.metadata().location_id.is_top_level()
3242                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3243                                    && graph_builders.singleton_intermediates()
3244                                    && !node.metadata().collection_kind.is_bounded()
3245                                {
3246                                    let builder = graph_builders.get_dfir_mut(&out_location);
3247
3248                                    let acc: syn::Expr = parse_quote!({
3249                                        let mut __init = #init;
3250                                        let mut __inner = #acc;
3251                                        move |__state, __kv: (_, _)| {
3252                                            // TODO(shadaj): we can avoid the clone when the entry exists
3253                                            let __state = __state
3254                                                .entry(::std::clone::Clone::clone(&__kv.0))
3255                                                .or_insert_with(|| (__init)());
3256                                            __inner(__state, __kv.1);
3257                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3258                                        }
3259                                    });
3260
3261                                    builder.add_dfir(
3262                                        parse_quote! {
3263                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3264                                        },
3265                                        None,
3266                                        Some(&next_stmt_id.to_string()),
3267                                    );
3268                                } else {
3269                                    let builder = graph_builders.get_dfir_mut(&out_location);
3270                                    builder.add_dfir(
3271                                        parse_quote! {
3272                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3273                                        },
3274                                        None,
3275                                        Some(&next_stmt_id.to_string()),
3276                                    );
3277                                }
3278                            }
3279                            BuildersOrCallback::Callback(_, node_callback) => {
3280                                node_callback(node, next_stmt_id);
3281                            }
3282                        }
3283
3284                        *next_stmt_id += 1;
3285
3286                        ident_stack.push(fold_ident);
3287                    }
3288
3289                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3290                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3291                            if input.metadata().location_id.is_top_level()
3292                                && input.metadata().collection_kind.is_bounded()
3293                            {
3294                                parse_quote!(reduce_no_replay)
3295                            } else {
3296                                parse_quote!(reduce)
3297                            }
3298                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3299                            if input.metadata().location_id.is_top_level()
3300                                && input.metadata().collection_kind.is_bounded()
3301                            {
3302                                todo!(
3303                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3304                                )
3305                            } else {
3306                                parse_quote!(reduce_keyed)
3307                            }
3308                        } else {
3309                            unreachable!()
3310                        };
3311
3312                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3313                        else {
3314                            unreachable!()
3315                        };
3316
3317                        let lifetime = if input.metadata().location_id.is_top_level() {
3318                            quote!('static)
3319                        } else {
3320                            quote!('tick)
3321                        };
3322
3323                        let input_ident = ident_stack.pop().unwrap();
3324
3325                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3326                        else {
3327                            unreachable!()
3328                        };
3329
3330                        let reduce_ident =
3331                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3332
3333                        match builders_or_callback {
3334                            BuildersOrCallback::Builders(graph_builders) => {
3335                                if matches!(node, HydroNode::Reduce { .. })
3336                                    && node.metadata().location_id.is_top_level()
3337                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3338                                    && graph_builders.singleton_intermediates()
3339                                    && !node.metadata().collection_kind.is_bounded()
3340                                {
3341                                    todo!(
3342                                        "Reduce with optional intermediates is not yet supported in simulator"
3343                                    );
3344                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3345                                    && node.metadata().location_id.is_top_level()
3346                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3347                                    && graph_builders.singleton_intermediates()
3348                                    && !node.metadata().collection_kind.is_bounded()
3349                                {
3350                                    todo!(
3351                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3352                                    );
3353                                } else {
3354                                    let builder = graph_builders.get_dfir_mut(&out_location);
3355                                    builder.add_dfir(
3356                                        parse_quote! {
3357                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3358                                        },
3359                                        None,
3360                                        Some(&next_stmt_id.to_string()),
3361                                    );
3362                                }
3363                            }
3364                            BuildersOrCallback::Callback(_, node_callback) => {
3365                                node_callback(node, next_stmt_id);
3366                            }
3367                        }
3368
3369                        *next_stmt_id += 1;
3370
3371                        ident_stack.push(reduce_ident);
3372                    }
3373
3374                    HydroNode::ReduceKeyedWatermark {
3375                        f,
3376                        input,
3377                        metadata,
3378                        ..
3379                    } => {
3380                        let lifetime = if input.metadata().location_id.is_top_level() {
3381                            quote!('static)
3382                        } else {
3383                            quote!('tick)
3384                        };
3385
3386                        // watermark is processed second, so it's on top
3387                        let watermark_ident = ident_stack.pop().unwrap();
3388                        let input_ident = ident_stack.pop().unwrap();
3389
3390                        let chain_ident = syn::Ident::new(
3391                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3392                            Span::call_site(),
3393                        );
3394
3395                        let fold_ident =
3396                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3397
3398                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3399                            && input.metadata().collection_kind.is_bounded()
3400                        {
3401                            parse_quote!(fold_no_replay)
3402                        } else {
3403                            parse_quote!(fold)
3404                        };
3405
3406                        match builders_or_callback {
3407                            BuildersOrCallback::Builders(graph_builders) => {
3408                                if metadata.location_id.is_top_level()
3409                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3410                                    && graph_builders.singleton_intermediates()
3411                                    && !metadata.collection_kind.is_bounded()
3412                                {
3413                                    todo!(
3414                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3415                                    )
3416                                } else {
3417                                    let builder = graph_builders.get_dfir_mut(&out_location);
3418                                    builder.add_dfir(
3419                                        parse_quote! {
3420                                            #chain_ident = chain();
3421                                            #input_ident
3422                                                -> map(|x| (Some(x), None))
3423                                                -> [0]#chain_ident;
3424                                            #watermark_ident
3425                                                -> map(|watermark| (None, Some(watermark)))
3426                                                -> [1]#chain_ident;
3427
3428                                            #fold_ident = #chain_ident
3429                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3430                                                    let __reduce_keyed_fn = #f;
3431                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3432                                                        if let Some((k, v)) = opt_payload {
3433                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3434                                                                if k <= curr_watermark {
3435                                                                    return;
3436                                                                }
3437                                                            }
3438                                                            match map.entry(k) {
3439                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3440                                                                    e.insert(v);
3441                                                                }
3442                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3443                                                                    __reduce_keyed_fn(e.get_mut(), v);
3444                                                                }
3445                                                            }
3446                                                        } else {
3447                                                            let watermark = opt_watermark.unwrap();
3448                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3449                                                                if watermark <= curr_watermark {
3450                                                                    return;
3451                                                                }
3452                                                            }
3453                                                            *opt_curr_watermark = opt_watermark;
3454                                                            map.retain(|k, _| *k > watermark);
3455                                                        }
3456                                                    }
3457                                                })
3458                                                -> flat_map(|(map, _curr_watermark)| map);
3459                                        },
3460                                        None,
3461                                        Some(&next_stmt_id.to_string()),
3462                                    );
3463                                }
3464                            }
3465                            BuildersOrCallback::Callback(_, node_callback) => {
3466                                node_callback(node, next_stmt_id);
3467                            }
3468                        }
3469
3470                        *next_stmt_id += 1;
3471
3472                        ident_stack.push(fold_ident);
3473                    }
3474
3475                    HydroNode::Network {
3476                        networking_info,
3477                        serialize_fn: serialize_pipeline,
3478                        instantiate_fn,
3479                        deserialize_fn: deserialize_pipeline,
3480                        input,
3481                        ..
3482                    } => {
3483                        let input_ident = ident_stack.pop().unwrap();
3484
3485                        let receiver_stream_ident =
3486                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3487
3488                        match builders_or_callback {
3489                            BuildersOrCallback::Builders(graph_builders) => {
3490                                let (sink_expr, source_expr) = match instantiate_fn {
3491                                    DebugInstantiate::Building => (
3492                                        syn::parse_quote!(DUMMY_SINK),
3493                                        syn::parse_quote!(DUMMY_SOURCE),
3494                                    ),
3495
3496                                    DebugInstantiate::Finalized(finalized) => {
3497                                        (finalized.sink.clone(), finalized.source.clone())
3498                                    }
3499                                };
3500
3501                                graph_builders.create_network(
3502                                    &input.metadata().location_id,
3503                                    &out_location,
3504                                    input_ident,
3505                                    &receiver_stream_ident,
3506                                    serialize_pipeline.as_ref(),
3507                                    sink_expr,
3508                                    source_expr,
3509                                    deserialize_pipeline.as_ref(),
3510                                    *next_stmt_id,
3511                                    networking_info,
3512                                );
3513                            }
3514                            BuildersOrCallback::Callback(_, node_callback) => {
3515                                node_callback(node, next_stmt_id);
3516                            }
3517                        }
3518
3519                        *next_stmt_id += 1;
3520
3521                        ident_stack.push(receiver_stream_ident);
3522                    }
3523
3524                    HydroNode::ExternalInput {
3525                        instantiate_fn,
3526                        deserialize_fn: deserialize_pipeline,
3527                        ..
3528                    } => {
3529                        let receiver_stream_ident =
3530                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3531
3532                        match builders_or_callback {
3533                            BuildersOrCallback::Builders(graph_builders) => {
3534                                let (_, source_expr) = match instantiate_fn {
3535                                    DebugInstantiate::Building => (
3536                                        syn::parse_quote!(DUMMY_SINK),
3537                                        syn::parse_quote!(DUMMY_SOURCE),
3538                                    ),
3539
3540                                    DebugInstantiate::Finalized(finalized) => {
3541                                        (finalized.sink.clone(), finalized.source.clone())
3542                                    }
3543                                };
3544
3545                                graph_builders.create_external_source(
3546                                    &out_location,
3547                                    source_expr,
3548                                    &receiver_stream_ident,
3549                                    deserialize_pipeline.as_ref(),
3550                                    *next_stmt_id,
3551                                );
3552                            }
3553                            BuildersOrCallback::Callback(_, node_callback) => {
3554                                node_callback(node, next_stmt_id);
3555                            }
3556                        }
3557
3558                        *next_stmt_id += 1;
3559
3560                        ident_stack.push(receiver_stream_ident);
3561                    }
3562
3563                    HydroNode::Counter {
3564                        tag,
3565                        duration,
3566                        prefix,
3567                        ..
3568                    } => {
3569                        let input_ident = ident_stack.pop().unwrap();
3570
3571                        let counter_ident =
3572                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3573
3574                        match builders_or_callback {
3575                            BuildersOrCallback::Builders(graph_builders) => {
3576                                let arg = format!("{}({})", prefix, tag);
3577                                let builder = graph_builders.get_dfir_mut(&out_location);
3578                                builder.add_dfir(
3579                                    parse_quote! {
3580                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3581                                    },
3582                                    None,
3583                                    Some(&next_stmt_id.to_string()),
3584                                );
3585                            }
3586                            BuildersOrCallback::Callback(_, node_callback) => {
3587                                node_callback(node, next_stmt_id);
3588                            }
3589                        }
3590
3591                        *next_stmt_id += 1;
3592
3593                        ident_stack.push(counter_ident);
3594                    }
3595                }
3596            },
3597            seen_tees,
3598            false,
3599        );
3600
3601        ident_stack
3602            .pop()
3603            .expect("ident_stack should have exactly one element after traversal")
3604    }
3605
3606    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3607        match self {
3608            HydroNode::Placeholder => {
3609                panic!()
3610            }
3611            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3612            HydroNode::Source { source, .. } => match source {
3613                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3614                HydroSource::ExternalNetwork()
3615                | HydroSource::Spin()
3616                | HydroSource::ClusterMembers(_, _)
3617                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3618            },
3619            HydroNode::SingletonSource { value, .. } => {
3620                transform(value);
3621            }
3622            HydroNode::CycleSource { .. }
3623            | HydroNode::Tee { .. }
3624            | HydroNode::YieldConcat { .. }
3625            | HydroNode::BeginAtomic { .. }
3626            | HydroNode::EndAtomic { .. }
3627            | HydroNode::Batch { .. }
3628            | HydroNode::Chain { .. }
3629            | HydroNode::ChainFirst { .. }
3630            | HydroNode::CrossProduct { .. }
3631            | HydroNode::CrossSingleton { .. }
3632            | HydroNode::ResolveFutures { .. }
3633            | HydroNode::ResolveFuturesOrdered { .. }
3634            | HydroNode::Join { .. }
3635            | HydroNode::Difference { .. }
3636            | HydroNode::AntiJoin { .. }
3637            | HydroNode::DeferTick { .. }
3638            | HydroNode::Enumerate { .. }
3639            | HydroNode::Unique { .. }
3640            | HydroNode::Sort { .. } => {}
3641            HydroNode::Map { f, .. }
3642            | HydroNode::FlatMap { f, .. }
3643            | HydroNode::Filter { f, .. }
3644            | HydroNode::FilterMap { f, .. }
3645            | HydroNode::Inspect { f, .. }
3646            | HydroNode::Reduce { f, .. }
3647            | HydroNode::ReduceKeyed { f, .. }
3648            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3649                transform(f);
3650            }
3651            HydroNode::Fold { init, acc, .. }
3652            | HydroNode::Scan { init, acc, .. }
3653            | HydroNode::FoldKeyed { init, acc, .. } => {
3654                transform(init);
3655                transform(acc);
3656            }
3657            HydroNode::Network {
3658                serialize_fn,
3659                deserialize_fn,
3660                ..
3661            } => {
3662                if let Some(serialize_fn) = serialize_fn {
3663                    transform(serialize_fn);
3664                }
3665                if let Some(deserialize_fn) = deserialize_fn {
3666                    transform(deserialize_fn);
3667                }
3668            }
3669            HydroNode::ExternalInput { deserialize_fn, .. } => {
3670                if let Some(deserialize_fn) = deserialize_fn {
3671                    transform(deserialize_fn);
3672                }
3673            }
3674            HydroNode::Counter { duration, .. } => {
3675                transform(duration);
3676            }
3677        }
3678    }
3679
3680    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3681        &self.metadata().op
3682    }
3683
3684    pub fn metadata(&self) -> &HydroIrMetadata {
3685        match self {
3686            HydroNode::Placeholder => {
3687                panic!()
3688            }
3689            HydroNode::Cast { metadata, .. } => metadata,
3690            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3691            HydroNode::Source { metadata, .. } => metadata,
3692            HydroNode::SingletonSource { metadata, .. } => metadata,
3693            HydroNode::CycleSource { metadata, .. } => metadata,
3694            HydroNode::Tee { metadata, .. } => metadata,
3695            HydroNode::YieldConcat { metadata, .. } => metadata,
3696            HydroNode::BeginAtomic { metadata, .. } => metadata,
3697            HydroNode::EndAtomic { metadata, .. } => metadata,
3698            HydroNode::Batch { metadata, .. } => metadata,
3699            HydroNode::Chain { metadata, .. } => metadata,
3700            HydroNode::ChainFirst { metadata, .. } => metadata,
3701            HydroNode::CrossProduct { metadata, .. } => metadata,
3702            HydroNode::CrossSingleton { metadata, .. } => metadata,
3703            HydroNode::Join { metadata, .. } => metadata,
3704            HydroNode::Difference { metadata, .. } => metadata,
3705            HydroNode::AntiJoin { metadata, .. } => metadata,
3706            HydroNode::ResolveFutures { metadata, .. } => metadata,
3707            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3708            HydroNode::Map { metadata, .. } => metadata,
3709            HydroNode::FlatMap { metadata, .. } => metadata,
3710            HydroNode::Filter { metadata, .. } => metadata,
3711            HydroNode::FilterMap { metadata, .. } => metadata,
3712            HydroNode::DeferTick { metadata, .. } => metadata,
3713            HydroNode::Enumerate { metadata, .. } => metadata,
3714            HydroNode::Inspect { metadata, .. } => metadata,
3715            HydroNode::Unique { metadata, .. } => metadata,
3716            HydroNode::Sort { metadata, .. } => metadata,
3717            HydroNode::Scan { metadata, .. } => metadata,
3718            HydroNode::Fold { metadata, .. } => metadata,
3719            HydroNode::FoldKeyed { metadata, .. } => metadata,
3720            HydroNode::Reduce { metadata, .. } => metadata,
3721            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3722            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3723            HydroNode::ExternalInput { metadata, .. } => metadata,
3724            HydroNode::Network { metadata, .. } => metadata,
3725            HydroNode::Counter { metadata, .. } => metadata,
3726        }
3727    }
3728
3729    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3730        &mut self.metadata_mut().op
3731    }
3732
3733    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3734        match self {
3735            HydroNode::Placeholder => {
3736                panic!()
3737            }
3738            HydroNode::Cast { metadata, .. } => metadata,
3739            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3740            HydroNode::Source { metadata, .. } => metadata,
3741            HydroNode::SingletonSource { metadata, .. } => metadata,
3742            HydroNode::CycleSource { metadata, .. } => metadata,
3743            HydroNode::Tee { metadata, .. } => metadata,
3744            HydroNode::YieldConcat { metadata, .. } => metadata,
3745            HydroNode::BeginAtomic { metadata, .. } => metadata,
3746            HydroNode::EndAtomic { metadata, .. } => metadata,
3747            HydroNode::Batch { metadata, .. } => metadata,
3748            HydroNode::Chain { metadata, .. } => metadata,
3749            HydroNode::ChainFirst { metadata, .. } => metadata,
3750            HydroNode::CrossProduct { metadata, .. } => metadata,
3751            HydroNode::CrossSingleton { metadata, .. } => metadata,
3752            HydroNode::Join { metadata, .. } => metadata,
3753            HydroNode::Difference { metadata, .. } => metadata,
3754            HydroNode::AntiJoin { metadata, .. } => metadata,
3755            HydroNode::ResolveFutures { metadata, .. } => metadata,
3756            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3757            HydroNode::Map { metadata, .. } => metadata,
3758            HydroNode::FlatMap { metadata, .. } => metadata,
3759            HydroNode::Filter { metadata, .. } => metadata,
3760            HydroNode::FilterMap { metadata, .. } => metadata,
3761            HydroNode::DeferTick { metadata, .. } => metadata,
3762            HydroNode::Enumerate { metadata, .. } => metadata,
3763            HydroNode::Inspect { metadata, .. } => metadata,
3764            HydroNode::Unique { metadata, .. } => metadata,
3765            HydroNode::Sort { metadata, .. } => metadata,
3766            HydroNode::Scan { metadata, .. } => metadata,
3767            HydroNode::Fold { metadata, .. } => metadata,
3768            HydroNode::FoldKeyed { metadata, .. } => metadata,
3769            HydroNode::Reduce { metadata, .. } => metadata,
3770            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3771            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3772            HydroNode::ExternalInput { metadata, .. } => metadata,
3773            HydroNode::Network { metadata, .. } => metadata,
3774            HydroNode::Counter { metadata, .. } => metadata,
3775        }
3776    }
3777
3778    pub fn input(&self) -> Vec<&HydroNode> {
3779        match self {
3780            HydroNode::Placeholder => {
3781                panic!()
3782            }
3783            HydroNode::Source { .. }
3784            | HydroNode::SingletonSource { .. }
3785            | HydroNode::ExternalInput { .. }
3786            | HydroNode::CycleSource { .. }
3787            | HydroNode::Tee { .. } => {
3788                // Tee should find its input in separate special ways
3789                vec![]
3790            }
3791            HydroNode::Cast { inner, .. }
3792            | HydroNode::ObserveNonDet { inner, .. }
3793            | HydroNode::YieldConcat { inner, .. }
3794            | HydroNode::BeginAtomic { inner, .. }
3795            | HydroNode::EndAtomic { inner, .. }
3796            | HydroNode::Batch { inner, .. } => {
3797                vec![inner]
3798            }
3799            HydroNode::Chain { first, second, .. } => {
3800                vec![first, second]
3801            }
3802            HydroNode::ChainFirst { first, second, .. } => {
3803                vec![first, second]
3804            }
3805            HydroNode::CrossProduct { left, right, .. }
3806            | HydroNode::CrossSingleton { left, right, .. }
3807            | HydroNode::Join { left, right, .. } => {
3808                vec![left, right]
3809            }
3810            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3811                vec![pos, neg]
3812            }
3813            HydroNode::Map { input, .. }
3814            | HydroNode::FlatMap { input, .. }
3815            | HydroNode::Filter { input, .. }
3816            | HydroNode::FilterMap { input, .. }
3817            | HydroNode::Sort { input, .. }
3818            | HydroNode::DeferTick { input, .. }
3819            | HydroNode::Enumerate { input, .. }
3820            | HydroNode::Inspect { input, .. }
3821            | HydroNode::Unique { input, .. }
3822            | HydroNode::Network { input, .. }
3823            | HydroNode::Counter { input, .. }
3824            | HydroNode::ResolveFutures { input, .. }
3825            | HydroNode::ResolveFuturesOrdered { input, .. }
3826            | HydroNode::Fold { input, .. }
3827            | HydroNode::FoldKeyed { input, .. }
3828            | HydroNode::Reduce { input, .. }
3829            | HydroNode::ReduceKeyed { input, .. }
3830            | HydroNode::Scan { input, .. } => {
3831                vec![input]
3832            }
3833            HydroNode::ReduceKeyedWatermark {
3834                input, watermark, ..
3835            } => {
3836                vec![input, watermark]
3837            }
3838        }
3839    }
3840
3841    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3842        self.input()
3843            .iter()
3844            .map(|input_node| input_node.metadata())
3845            .collect()
3846    }
3847
3848    pub fn print_root(&self) -> String {
3849        match self {
3850            HydroNode::Placeholder => {
3851                panic!()
3852            }
3853            HydroNode::Cast { .. } => "Cast()".to_owned(),
3854            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3855            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3856            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3857            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3858            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3859            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3860            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3861            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3862            HydroNode::Batch { .. } => "Batch()".to_owned(),
3863            HydroNode::Chain { first, second, .. } => {
3864                format!("Chain({}, {})", first.print_root(), second.print_root())
3865            }
3866            HydroNode::ChainFirst { first, second, .. } => {
3867                format!(
3868                    "ChainFirst({}, {})",
3869                    first.print_root(),
3870                    second.print_root()
3871                )
3872            }
3873            HydroNode::CrossProduct { left, right, .. } => {
3874                format!(
3875                    "CrossProduct({}, {})",
3876                    left.print_root(),
3877                    right.print_root()
3878                )
3879            }
3880            HydroNode::CrossSingleton { left, right, .. } => {
3881                format!(
3882                    "CrossSingleton({}, {})",
3883                    left.print_root(),
3884                    right.print_root()
3885                )
3886            }
3887            HydroNode::Join { left, right, .. } => {
3888                format!("Join({}, {})", left.print_root(), right.print_root())
3889            }
3890            HydroNode::Difference { pos, neg, .. } => {
3891                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3892            }
3893            HydroNode::AntiJoin { pos, neg, .. } => {
3894                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3895            }
3896            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3897            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3898            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3899            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3900            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3901            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3902            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3903            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3904            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3905            HydroNode::Unique { .. } => "Unique()".to_owned(),
3906            HydroNode::Sort { .. } => "Sort()".to_owned(),
3907            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3908            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3909            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3910            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3911            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3912            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3913            HydroNode::Network { .. } => "Network()".to_owned(),
3914            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3915            HydroNode::Counter { tag, duration, .. } => {
3916                format!("Counter({:?}, {:?})", tag, duration)
3917            }
3918        }
3919    }
3920}
3921
3922#[cfg(feature = "build")]
3923fn instantiate_network<'a, D>(
3924    env: &mut D::InstantiateEnv,
3925    from_location: &LocationId,
3926    to_location: &LocationId,
3927    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3928    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3929    name: Option<&str>,
3930    networking_info: &crate::networking::NetworkingInfo,
3931) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3932where
3933    D: Deploy<'a>,
3934{
3935    let ((sink, source), connect_fn) = match (from_location, to_location) {
3936        (&LocationId::Process(from), &LocationId::Process(to)) => {
3937            let from_node = processes
3938                .get(from)
3939                .unwrap_or_else(|| {
3940                    panic!("A process used in the graph was not instantiated: {}", from)
3941                })
3942                .clone();
3943            let to_node = processes
3944                .get(to)
3945                .unwrap_or_else(|| {
3946                    panic!("A process used in the graph was not instantiated: {}", to)
3947                })
3948                .clone();
3949
3950            let sink_port = from_node.next_port();
3951            let source_port = to_node.next_port();
3952
3953            (
3954                D::o2o_sink_source(
3955                    env,
3956                    &from_node,
3957                    &sink_port,
3958                    &to_node,
3959                    &source_port,
3960                    name,
3961                    networking_info,
3962                ),
3963                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3964            )
3965        }
3966        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3967            let from_node = processes
3968                .get(from)
3969                .unwrap_or_else(|| {
3970                    panic!("A process used in the graph was not instantiated: {}", from)
3971                })
3972                .clone();
3973            let to_node = clusters
3974                .get(to)
3975                .unwrap_or_else(|| {
3976                    panic!("A cluster used in the graph was not instantiated: {}", to)
3977                })
3978                .clone();
3979
3980            let sink_port = from_node.next_port();
3981            let source_port = to_node.next_port();
3982
3983            (
3984                D::o2m_sink_source(
3985                    env,
3986                    &from_node,
3987                    &sink_port,
3988                    &to_node,
3989                    &source_port,
3990                    name,
3991                    networking_info,
3992                ),
3993                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3994            )
3995        }
3996        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3997            let from_node = clusters
3998                .get(from)
3999                .unwrap_or_else(|| {
4000                    panic!("A cluster used in the graph was not instantiated: {}", from)
4001                })
4002                .clone();
4003            let to_node = processes
4004                .get(to)
4005                .unwrap_or_else(|| {
4006                    panic!("A process used in the graph was not instantiated: {}", to)
4007                })
4008                .clone();
4009
4010            let sink_port = from_node.next_port();
4011            let source_port = to_node.next_port();
4012
4013            (
4014                D::m2o_sink_source(
4015                    env,
4016                    &from_node,
4017                    &sink_port,
4018                    &to_node,
4019                    &source_port,
4020                    name,
4021                    networking_info,
4022                ),
4023                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4024            )
4025        }
4026        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4027            let from_node = clusters
4028                .get(from)
4029                .unwrap_or_else(|| {
4030                    panic!("A cluster used in the graph was not instantiated: {}", from)
4031                })
4032                .clone();
4033            let to_node = clusters
4034                .get(to)
4035                .unwrap_or_else(|| {
4036                    panic!("A cluster used in the graph was not instantiated: {}", to)
4037                })
4038                .clone();
4039
4040            let sink_port = from_node.next_port();
4041            let source_port = to_node.next_port();
4042
4043            (
4044                D::m2m_sink_source(
4045                    env,
4046                    &from_node,
4047                    &sink_port,
4048                    &to_node,
4049                    &source_port,
4050                    name,
4051                    networking_info,
4052                ),
4053                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4054            )
4055        }
4056        (LocationId::Tick(_, _), _) => panic!(),
4057        (_, LocationId::Tick(_, _)) => panic!(),
4058        (LocationId::Atomic(_), _) => panic!(),
4059        (_, LocationId::Atomic(_)) => panic!(),
4060    };
4061    (sink, source, connect_fn)
4062}
4063
4064#[cfg(test)]
4065mod test {
4066    use std::mem::size_of;
4067
4068    use stageleft::{QuotedWithContext, q};
4069
4070    use super::*;
4071
4072    #[test]
4073    #[cfg_attr(
4074        not(feature = "build"),
4075        ignore = "expects inclusion of feature-gated fields"
4076    )]
4077    fn hydro_node_size() {
4078        assert_eq!(size_of::<HydroNode>(), 248);
4079    }
4080
4081    #[test]
4082    #[cfg_attr(
4083        not(feature = "build"),
4084        ignore = "expects inclusion of feature-gated fields"
4085    )]
4086    fn hydro_root_size() {
4087        assert_eq!(size_of::<HydroRoot>(), 136);
4088    }
4089
4090    #[test]
4091    fn test_simplify_q_macro_basic() {
4092        // Test basic non-q! expression
4093        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4094        let result = simplify_q_macro(simple_expr.clone());
4095        assert_eq!(result, simple_expr);
4096    }
4097
4098    #[test]
4099    fn test_simplify_q_macro_actual_stageleft_call() {
4100        // Test a simplified version of what a real stageleft call might look like
4101        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4102        let result = simplify_q_macro(stageleft_call);
4103        // This should be processed by our visitor and simplified to q!(...)
4104        // since we detect the stageleft::runtime_support::fn_* pattern
4105        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4106    }
4107
4108    #[test]
4109    fn test_closure_no_pipe_at_start() {
4110        // Test a closure that does not start with a pipe
4111        let stageleft_call = q!({
4112            let foo = 123;
4113            move |b: usize| b + foo
4114        })
4115        .splice_fn1_ctx(&());
4116        let result = simplify_q_macro(stageleft_call);
4117        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4118    }
4119}