Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44    fn from(expr: syn::Expr) -> Self {
45        Self(Box::new(expr))
46    }
47}
48
49impl Deref for DebugExpr {
50    type Target = syn::Expr;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57impl ToTokens for DebugExpr {
58    fn to_tokens(&self, tokens: &mut TokenStream) {
59        self.0.to_tokens(tokens);
60    }
61}
62
63impl Debug for DebugExpr {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "{}", self.0.to_token_stream())
66    }
67}
68
69impl Display for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let original = self.0.as_ref().clone();
72        let simplified = simplify_q_macro(original);
73
74        // For now, just use quote formatting without trying to parse as a statement
75        // This avoids the syn::parse_quote! issues entirely
76        write!(f, "q!({})", quote::quote!(#simplified))
77    }
78}
79
80/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
81fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82    // Try to parse the token string as a syn::Expr
83    // Use a visitor to simplify q! macro expansions
84    let mut simplifier = QMacroSimplifier::new();
85    simplifier.visit_expr_mut(&mut expr);
86
87    // If we found and simplified a q! macro, return the simplified version
88    if let Some(simplified) = simplifier.simplified_result {
89        simplified
90    } else {
91        expr
92    }
93}
94
95/// AST visitor that simplifies q! macro expansions
96#[derive(Default)]
97pub struct QMacroSimplifier {
98    pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102    pub fn new() -> Self {
103        Self::default()
104    }
105}
106
107impl VisitMut for QMacroSimplifier {
108    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109        // Check if we already found a result to avoid further processing
110        if self.simplified_result.is_some() {
111            return;
112        }
113
114        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115            // Look for calls to stageleft::runtime_support::fn*
116            && self.is_stageleft_runtime_support_call(&path_expr.path)
117            // Try to extract the closure from the arguments
118            && let Some(closure) = self.extract_closure_from_args(&call.args)
119        {
120            self.simplified_result = Some(closure);
121            return;
122        }
123
124        // Continue visiting child expressions using the default implementation
125        // Use the default visitor to avoid infinite recursion
126        syn::visit_mut::visit_expr_mut(self, expr);
127    }
128}
129
130impl QMacroSimplifier {
131    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132        // Check if this is a call to stageleft::runtime_support::fn*
133        if let Some(last_segment) = path.segments.last() {
134            let fn_name = last_segment.ident.to_string();
135            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
136            fn_name.contains("_type_hint")
137                && path.segments.len() > 2
138                && path.segments[0].ident == "stageleft"
139                && path.segments[1].ident == "runtime_support"
140        } else {
141            false
142        }
143    }
144
145    fn extract_closure_from_args(
146        &self,
147        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148    ) -> Option<syn::Expr> {
149        // Look through the arguments for a closure expression
150        for arg in args {
151            if let syn::Expr::Closure(_) = arg {
152                return Some(arg.clone());
153            }
154            // Also check for closures nested in other expressions (like blocks)
155            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156                return Some(closure_expr);
157            }
158        }
159        None
160    }
161
162    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163        let mut visitor = ClosureFinder {
164            found_closure: None,
165            prefer_inner_blocks: true,
166        };
167        visitor.visit_expr(expr);
168        visitor.found_closure
169    }
170}
171
172/// Visitor that finds closures in expressions with special block handling
173struct ClosureFinder {
174    found_closure: Option<syn::Expr>,
175    prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180        // If we already found a closure, don't continue searching
181        if self.found_closure.is_some() {
182            return;
183        }
184
185        match expr {
186            syn::Expr::Closure(_) => {
187                self.found_closure = Some(expr.clone());
188            }
189            syn::Expr::Block(block) if self.prefer_inner_blocks => {
190                // Special handling for blocks - look for inner blocks that contain closures
191                for stmt in &block.block.stmts {
192                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
193                        && let syn::Expr::Block(_) = stmt_expr
194                    {
195                        // Check if this nested block contains a closure
196                        let mut inner_visitor = ClosureFinder {
197                            found_closure: None,
198                            prefer_inner_blocks: false, // Avoid infinite recursion
199                        };
200                        inner_visitor.visit_expr(stmt_expr);
201                        if inner_visitor.found_closure.is_some() {
202                            // Found a closure in an inner block, return that block
203                            self.found_closure = Some(stmt_expr.clone());
204                            return;
205                        }
206                    }
207                }
208
209                // If no inner block with closure found, continue with normal visitation
210                visit::visit_expr(self, expr);
211
212                // If we found a closure, just return the closure itself, not the whole block
213                // unless we're in the special case where we want the containing block
214                if self.found_closure.is_some() {
215                    // The closure was found during visitation, no need to wrap in block
216                }
217            }
218            _ => {
219                // Use default visitor behavior for all other expressions
220                visit::visit_expr(self, expr);
221            }
222        }
223    }
224}
225
226/// Debug displays the type's tokens.
227///
228/// Boxes `syn::Type` which is ~320 bytes.
229#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233    fn from(t: syn::Type) -> Self {
234        Self(Box::new(t))
235    }
236}
237
238impl Deref for DebugType {
239    type Target = syn::Type;
240
241    fn deref(&self) -> &Self::Target {
242        &self.0
243    }
244}
245
246impl ToTokens for DebugType {
247    fn to_tokens(&self, tokens: &mut TokenStream) {
248        self.0.to_tokens(tokens);
249    }
250}
251
252impl Debug for DebugType {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "{}", self.0.to_token_stream())
255    }
256}
257
258pub enum DebugInstantiate {
259    Building,
260    Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264    not(feature = "build"),
265    expect(
266        dead_code,
267        reason = "sink, source unused without `feature = \"build\"`."
268    )
269)]
270pub struct DebugInstantiateFinalized {
271    sink: syn::Expr,
272    source: syn::Expr,
273    connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277    fn from(f: DebugInstantiateFinalized) -> Self {
278        Self::Finalized(Box::new(f))
279    }
280}
281
282impl Debug for DebugInstantiate {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        write!(f, "<network instantiate>")
285    }
286}
287
288impl Hash for DebugInstantiate {
289    fn hash<H: Hasher>(&self, _state: &mut H) {
290        // Do nothing
291    }
292}
293
294impl Clone for DebugInstantiate {
295    fn clone(&self) -> Self {
296        match self {
297            DebugInstantiate::Building => DebugInstantiate::Building,
298            DebugInstantiate::Finalized(_) => {
299                panic!("DebugInstantiate::Finalized should not be cloned")
300            }
301        }
302    }
303}
304
305/// Tracks the instantiation state of a `ClusterMembers` source.
306///
307/// During `compile_network`, the first `ClusterMembers` node for a given
308/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
309/// receives the expression returned by `Deploy::cluster_membership_stream`.
310/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
311/// during code-gen they simply reference the tee output of the first node
312/// instead of creating a redundant `source_stream`.
313#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315    /// Not yet instantiated.
316    Uninit,
317    /// The primary instance: holds the stream expression and will emit
318    /// `source_stream(expr) -> tee()` during code-gen.
319    Stream(DebugExpr),
320    /// A secondary instance that references the tee output of the primary.
321    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
322    /// can derive the deterministic tee ident without extra state.
323    Tee(LocationId, LocationId),
324}
325
326/// A source in a Hydro graph, where data enters the graph.
327#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329    Stream(DebugExpr),
330    ExternalNetwork(),
331    Iter(DebugExpr),
332    Spin(),
333    ClusterMembers(LocationId, ClusterMembersState),
334    Embedded(syn::Ident),
335    EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
340/// and simulations.
341///
342/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
343/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
344pub trait DfirBuilder {
345    /// Whether the representation of singletons should include intermediate states.
346    fn singleton_intermediates(&self) -> bool;
347
348    /// Gets the DFIR builder for the given location, creating it if necessary.
349    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351    fn batch(
352        &mut self,
353        in_ident: syn::Ident,
354        in_location: &LocationId,
355        in_kind: &CollectionKind,
356        out_ident: &syn::Ident,
357        out_location: &LocationId,
358        op_meta: &HydroIrOpMetadata,
359    );
360    fn yield_from_tick(
361        &mut self,
362        in_ident: syn::Ident,
363        in_location: &LocationId,
364        in_kind: &CollectionKind,
365        out_ident: &syn::Ident,
366        out_location: &LocationId,
367    );
368
369    fn begin_atomic(
370        &mut self,
371        in_ident: syn::Ident,
372        in_location: &LocationId,
373        in_kind: &CollectionKind,
374        out_ident: &syn::Ident,
375        out_location: &LocationId,
376        op_meta: &HydroIrOpMetadata,
377    );
378    fn end_atomic(
379        &mut self,
380        in_ident: syn::Ident,
381        in_location: &LocationId,
382        in_kind: &CollectionKind,
383        out_ident: &syn::Ident,
384    );
385
386    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387    fn observe_nondet(
388        &mut self,
389        trusted: bool,
390        location: &LocationId,
391        in_ident: syn::Ident,
392        in_kind: &CollectionKind,
393        out_ident: &syn::Ident,
394        out_kind: &CollectionKind,
395        op_meta: &HydroIrOpMetadata,
396    );
397
398    #[expect(clippy::too_many_arguments, reason = "TODO")]
399    fn create_network(
400        &mut self,
401        from: &LocationId,
402        to: &LocationId,
403        input_ident: syn::Ident,
404        out_ident: &syn::Ident,
405        serialize: Option<&DebugExpr>,
406        sink: syn::Expr,
407        source: syn::Expr,
408        deserialize: Option<&DebugExpr>,
409        tag_id: usize,
410        networking_info: &crate::networking::NetworkingInfo,
411    );
412
413    fn create_external_source(
414        &mut self,
415        on: &LocationId,
416        source_expr: syn::Expr,
417        out_ident: &syn::Ident,
418        deserialize: Option<&DebugExpr>,
419        tag_id: usize,
420    );
421
422    fn create_external_output(
423        &mut self,
424        on: &LocationId,
425        sink_expr: syn::Expr,
426        input_ident: &syn::Ident,
427        serialize: Option<&DebugExpr>,
428        tag_id: usize,
429    );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434    fn singleton_intermediates(&self) -> bool {
435        false
436    }
437
438    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439        self.entry(location.root().key())
440            .expect("location was removed")
441            .or_default()
442    }
443
444    fn batch(
445        &mut self,
446        in_ident: syn::Ident,
447        in_location: &LocationId,
448        in_kind: &CollectionKind,
449        out_ident: &syn::Ident,
450        _out_location: &LocationId,
451        _op_meta: &HydroIrOpMetadata,
452    ) {
453        let builder = self.get_dfir_mut(in_location.root());
454        if in_kind.is_bounded()
455            && matches!(
456                in_kind,
457                CollectionKind::Singleton { .. }
458                    | CollectionKind::Optional { .. }
459                    | CollectionKind::KeyedSingleton { .. }
460            )
461        {
462            assert!(in_location.is_top_level());
463            builder.add_dfir(
464                parse_quote! {
465                    #out_ident = #in_ident -> persist::<'static>();
466                },
467                None,
468                None,
469            );
470        } else {
471            builder.add_dfir(
472                parse_quote! {
473                    #out_ident = #in_ident;
474                },
475                None,
476                None,
477            );
478        }
479    }
480
481    fn yield_from_tick(
482        &mut self,
483        in_ident: syn::Ident,
484        in_location: &LocationId,
485        _in_kind: &CollectionKind,
486        out_ident: &syn::Ident,
487        _out_location: &LocationId,
488    ) {
489        let builder = self.get_dfir_mut(in_location.root());
490        builder.add_dfir(
491            parse_quote! {
492                #out_ident = #in_ident;
493            },
494            None,
495            None,
496        );
497    }
498
499    fn begin_atomic(
500        &mut self,
501        in_ident: syn::Ident,
502        in_location: &LocationId,
503        _in_kind: &CollectionKind,
504        out_ident: &syn::Ident,
505        _out_location: &LocationId,
506        _op_meta: &HydroIrOpMetadata,
507    ) {
508        let builder = self.get_dfir_mut(in_location.root());
509        builder.add_dfir(
510            parse_quote! {
511                #out_ident = #in_ident;
512            },
513            None,
514            None,
515        );
516    }
517
518    fn end_atomic(
519        &mut self,
520        in_ident: syn::Ident,
521        in_location: &LocationId,
522        _in_kind: &CollectionKind,
523        out_ident: &syn::Ident,
524    ) {
525        let builder = self.get_dfir_mut(in_location.root());
526        builder.add_dfir(
527            parse_quote! {
528                #out_ident = #in_ident;
529            },
530            None,
531            None,
532        );
533    }
534
535    fn observe_nondet(
536        &mut self,
537        _trusted: bool,
538        location: &LocationId,
539        in_ident: syn::Ident,
540        _in_kind: &CollectionKind,
541        out_ident: &syn::Ident,
542        _out_kind: &CollectionKind,
543        _op_meta: &HydroIrOpMetadata,
544    ) {
545        let builder = self.get_dfir_mut(location);
546        builder.add_dfir(
547            parse_quote! {
548                #out_ident = #in_ident;
549            },
550            None,
551            None,
552        );
553    }
554
555    fn create_network(
556        &mut self,
557        from: &LocationId,
558        to: &LocationId,
559        input_ident: syn::Ident,
560        out_ident: &syn::Ident,
561        serialize: Option<&DebugExpr>,
562        sink: syn::Expr,
563        source: syn::Expr,
564        deserialize: Option<&DebugExpr>,
565        tag_id: usize,
566        _networking_info: &crate::networking::NetworkingInfo,
567    ) {
568        let sender_builder = self.get_dfir_mut(from);
569        if let Some(serialize_pipeline) = serialize {
570            sender_builder.add_dfir(
571                parse_quote! {
572                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573                },
574                None,
575                // operator tag separates send and receive, which otherwise have the same next_stmt_id
576                Some(&format!("send{}", tag_id)),
577            );
578        } else {
579            sender_builder.add_dfir(
580                parse_quote! {
581                    #input_ident -> dest_sink(#sink);
582                },
583                None,
584                Some(&format!("send{}", tag_id)),
585            );
586        }
587
588        let receiver_builder = self.get_dfir_mut(to);
589        if let Some(deserialize_pipeline) = deserialize {
590            receiver_builder.add_dfir(
591                parse_quote! {
592                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593                },
594                None,
595                Some(&format!("recv{}", tag_id)),
596            );
597        } else {
598            receiver_builder.add_dfir(
599                parse_quote! {
600                    #out_ident = source_stream(#source);
601                },
602                None,
603                Some(&format!("recv{}", tag_id)),
604            );
605        }
606    }
607
608    fn create_external_source(
609        &mut self,
610        on: &LocationId,
611        source_expr: syn::Expr,
612        out_ident: &syn::Ident,
613        deserialize: Option<&DebugExpr>,
614        tag_id: usize,
615    ) {
616        let receiver_builder = self.get_dfir_mut(on);
617        if let Some(deserialize_pipeline) = deserialize {
618            receiver_builder.add_dfir(
619                parse_quote! {
620                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621                },
622                None,
623                Some(&format!("recv{}", tag_id)),
624            );
625        } else {
626            receiver_builder.add_dfir(
627                parse_quote! {
628                    #out_ident = source_stream(#source_expr);
629                },
630                None,
631                Some(&format!("recv{}", tag_id)),
632            );
633        }
634    }
635
636    fn create_external_output(
637        &mut self,
638        on: &LocationId,
639        sink_expr: syn::Expr,
640        input_ident: &syn::Ident,
641        serialize: Option<&DebugExpr>,
642        tag_id: usize,
643    ) {
644        let sender_builder = self.get_dfir_mut(on);
645        if let Some(serialize_fn) = serialize {
646            sender_builder.add_dfir(
647                parse_quote! {
648                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649                },
650                None,
651                // operator tag separates send and receive, which otherwise have the same next_stmt_id
652                Some(&format!("send{}", tag_id)),
653            );
654        } else {
655            sender_builder.add_dfir(
656                parse_quote! {
657                    #input_ident -> dest_sink(#sink_expr);
658                },
659                None,
660                Some(&format!("send{}", tag_id)),
661            );
662        }
663    }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669    L: FnMut(&mut HydroRoot, &mut usize),
670    N: FnMut(&mut HydroNode, &mut usize),
671{
672    Builders(&'a mut dyn DfirBuilder),
673    Callback(L, N),
674}
675
676/// An root in a Hydro graph, which is an pipeline that doesn't emit
677/// any downstream values. Traversals over the dataflow graph and
678/// generating DFIR IR start from roots.
679#[derive(Debug, Hash)]
680pub enum HydroRoot {
681    ForEach {
682        f: DebugExpr,
683        input: Box<HydroNode>,
684        op_metadata: HydroIrOpMetadata,
685    },
686    SendExternal {
687        to_external_key: LocationKey,
688        to_port_id: ExternalPortId,
689        to_many: bool,
690        unpaired: bool,
691        serialize_fn: Option<DebugExpr>,
692        instantiate_fn: DebugInstantiate,
693        input: Box<HydroNode>,
694        op_metadata: HydroIrOpMetadata,
695    },
696    DestSink {
697        sink: DebugExpr,
698        input: Box<HydroNode>,
699        op_metadata: HydroIrOpMetadata,
700    },
701    CycleSink {
702        cycle_id: CycleId,
703        input: Box<HydroNode>,
704        op_metadata: HydroIrOpMetadata,
705    },
706    EmbeddedOutput {
707        ident: syn::Ident,
708        input: Box<HydroNode>,
709        op_metadata: HydroIrOpMetadata,
710    },
711    Null {
712        input: Box<HydroNode>,
713        op_metadata: HydroIrOpMetadata,
714    },
715}
716
717impl HydroRoot {
718    #[cfg(feature = "build")]
719    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720    pub fn compile_network<'a, D>(
721        &mut self,
722        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723        seen_tees: &mut SeenSharedNodes,
724        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725        processes: &SparseSecondaryMap<LocationKey, D::Process>,
726        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727        externals: &SparseSecondaryMap<LocationKey, D::External>,
728        env: &mut D::InstantiateEnv,
729    ) where
730        D: Deploy<'a>,
731    {
732        let refcell_extra_stmts = RefCell::new(extra_stmts);
733        let refcell_env = RefCell::new(env);
734        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735        self.transform_bottom_up(
736            &mut |l| {
737                if let HydroRoot::SendExternal {
738                    input,
739                    to_external_key,
740                    to_port_id,
741                    to_many,
742                    unpaired,
743                    instantiate_fn,
744                    ..
745                } = l
746                {
747                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748                        DebugInstantiate::Building => {
749                            let to_node = externals
750                                .get(*to_external_key)
751                                .unwrap_or_else(|| {
752                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
753                                })
754                                .clone();
755
756                            match input.metadata().location_id.root() {
757                                &LocationId::Process(process_key) => {
758                                    if *to_many {
759                                        (
760                                            (
761                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762                                                parse_quote!(DUMMY),
763                                            ),
764                                            Box::new(|| {}) as Box<dyn FnOnce()>,
765                                        )
766                                    } else {
767                                        let from_node = processes
768                                            .get(process_key)
769                                            .unwrap_or_else(|| {
770                                                panic!("A process used in the graph was not instantiated: {}", process_key)
771                                            })
772                                            .clone();
773
774                                        let sink_port = from_node.next_port();
775                                        let source_port = to_node.next_port();
776
777                                        if *unpaired {
778                                            use stageleft::quote_type;
779                                            use tokio_util::codec::LengthDelimitedCodec;
780
781                                            to_node.register(*to_port_id, source_port.clone());
782
783                                            let _ = D::e2o_source(
784                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785                                                &to_node, &source_port,
786                                                &from_node, &sink_port,
787                                                &quote_type::<LengthDelimitedCodec>(),
788                                                format!("{}_{}", *to_external_key, *to_port_id)
789                                            );
790                                        }
791
792                                        (
793                                            (
794                                                D::o2e_sink(
795                                                    &from_node,
796                                                    &sink_port,
797                                                    &to_node,
798                                                    &source_port,
799                                                    format!("{}_{}", *to_external_key, *to_port_id)
800                                                ),
801                                                parse_quote!(DUMMY),
802                                            ),
803                                            if *unpaired {
804                                                D::e2o_connect(
805                                                    &to_node,
806                                                    &source_port,
807                                                    &from_node,
808                                                    &sink_port,
809                                                    *to_many,
810                                                    NetworkHint::Auto,
811                                                )
812                                            } else {
813                                                Box::new(|| {}) as Box<dyn FnOnce()>
814                                            },
815                                        )
816                                    }
817                                }
818                                LocationId::Cluster(cluster_key) => {
819                                    let from_node = clusters
820                                        .get(*cluster_key)
821                                        .unwrap_or_else(|| {
822                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
823                                        })
824                                        .clone();
825
826                                    let sink_port = from_node.next_port();
827                                    let source_port = to_node.next_port();
828
829                                    if *unpaired {
830                                        to_node.register(*to_port_id, source_port.clone());
831                                    }
832
833                                    (
834                                        (
835                                            D::m2e_sink(
836                                                &from_node,
837                                                &sink_port,
838                                                &to_node,
839                                                &source_port,
840                                                format!("{}_{}", *to_external_key, *to_port_id)
841                                            ),
842                                            parse_quote!(DUMMY),
843                                        ),
844                                        Box::new(|| {}) as Box<dyn FnOnce()>,
845                                    )
846                                }
847                                _ => panic!()
848                            }
849                        },
850
851                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
852                    };
853
854                    *instantiate_fn = DebugInstantiateFinalized {
855                        sink: sink_expr,
856                        source: source_expr,
857                        connect_fn: Some(connect_fn),
858                    }
859                    .into();
860                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
861                    let element_type = match &input.metadata().collection_kind {
862                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
863                        _ => panic!("Embedded output must have Stream collection kind"),
864                    };
865                    let location_key = match input.metadata().location_id.root() {
866                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
867                        _ => panic!("Embedded output must be on a process or cluster"),
868                    };
869                    D::register_embedded_output(
870                        &mut refcell_env.borrow_mut(),
871                        location_key,
872                        ident,
873                        &element_type,
874                    );
875                }
876            },
877            &mut |n| {
878                if let HydroNode::Network {
879                    name,
880                    networking_info,
881                    input,
882                    instantiate_fn,
883                    metadata,
884                    ..
885                } = n
886                {
887                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
888                        DebugInstantiate::Building => instantiate_network::<D>(
889                            &mut refcell_env.borrow_mut(),
890                            input.metadata().location_id.root(),
891                            metadata.location_id.root(),
892                            processes,
893                            clusters,
894                            name.as_deref(),
895                            networking_info,
896                        ),
897
898                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
899                    };
900
901                    *instantiate_fn = DebugInstantiateFinalized {
902                        sink: sink_expr,
903                        source: source_expr,
904                        connect_fn: Some(connect_fn),
905                    }
906                    .into();
907                } else if let HydroNode::ExternalInput {
908                    from_external_key,
909                    from_port_id,
910                    from_many,
911                    codec_type,
912                    port_hint,
913                    instantiate_fn,
914                    metadata,
915                    ..
916                } = n
917                {
918                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
919                        DebugInstantiate::Building => {
920                            let from_node = externals
921                                .get(*from_external_key)
922                                .unwrap_or_else(|| {
923                                    panic!(
924                                        "A external used in the graph was not instantiated: {}",
925                                        from_external_key,
926                                    )
927                                })
928                                .clone();
929
930                            match metadata.location_id.root() {
931                                &LocationId::Process(process_key) => {
932                                    let to_node = processes
933                                        .get(process_key)
934                                        .unwrap_or_else(|| {
935                                            panic!("A process used in the graph was not instantiated: {}", process_key)
936                                        })
937                                        .clone();
938
939                                    let sink_port = from_node.next_port();
940                                    let source_port = to_node.next_port();
941
942                                    from_node.register(*from_port_id, sink_port.clone());
943
944                                    (
945                                        (
946                                            parse_quote!(DUMMY),
947                                            if *from_many {
948                                                D::e2o_many_source(
949                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
950                                                    &to_node, &source_port,
951                                                    codec_type.0.as_ref(),
952                                                    format!("{}_{}", *from_external_key, *from_port_id)
953                                                )
954                                            } else {
955                                                D::e2o_source(
956                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
957                                                    &from_node, &sink_port,
958                                                    &to_node, &source_port,
959                                                    codec_type.0.as_ref(),
960                                                    format!("{}_{}", *from_external_key, *from_port_id)
961                                                )
962                                            },
963                                        ),
964                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965                                    )
966                                }
967                                LocationId::Cluster(cluster_key) => {
968                                    let to_node = clusters
969                                        .get(*cluster_key)
970                                        .unwrap_or_else(|| {
971                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
972                                        })
973                                        .clone();
974
975                                    let sink_port = from_node.next_port();
976                                    let source_port = to_node.next_port();
977
978                                    from_node.register(*from_port_id, sink_port.clone());
979
980                                    (
981                                        (
982                                            parse_quote!(DUMMY),
983                                            D::e2m_source(
984                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
985                                                &from_node, &sink_port,
986                                                &to_node, &source_port,
987                                                codec_type.0.as_ref(),
988                                                format!("{}_{}", *from_external_key, *from_port_id)
989                                            ),
990                                        ),
991                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
992                                    )
993                                }
994                                _ => panic!()
995                            }
996                        },
997
998                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
999                    };
1000
1001                    *instantiate_fn = DebugInstantiateFinalized {
1002                        sink: sink_expr,
1003                        source: source_expr,
1004                        connect_fn: Some(connect_fn),
1005                    }
1006                    .into();
1007                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1008                    let element_type = match &metadata.collection_kind {
1009                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1010                        _ => panic!("Embedded source must have Stream collection kind"),
1011                    };
1012                    let location_key = match metadata.location_id.root() {
1013                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1014                        _ => panic!("Embedded source must be on a process or cluster"),
1015                    };
1016                    D::register_embedded_stream_input(
1017                        &mut refcell_env.borrow_mut(),
1018                        location_key,
1019                        ident,
1020                        &element_type,
1021                    );
1022                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1023                    let element_type = match &metadata.collection_kind {
1024                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1025                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1026                    };
1027                    let location_key = match metadata.location_id.root() {
1028                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1029                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1030                    };
1031                    D::register_embedded_singleton_input(
1032                        &mut refcell_env.borrow_mut(),
1033                        location_key,
1034                        ident,
1035                        &element_type,
1036                    );
1037                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1038                    match state {
1039                        ClusterMembersState::Uninit => {
1040                            let at_location = metadata.location_id.root().clone();
1041                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1042                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1043                                // First occurrence: call cluster_membership_stream and mark as Stream.
1044                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1045                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1046                                    &(),
1047                                );
1048                                *state = ClusterMembersState::Stream(expr.into());
1049                            } else {
1050                                // Already instantiated for this (at, target) pair: just tee.
1051                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1052                            }
1053                        }
1054                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1055                            panic!("cluster members already finalized");
1056                        }
1057                    }
1058                }
1059            },
1060            seen_tees,
1061            false,
1062        );
1063    }
1064
1065    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1066        self.transform_bottom_up(
1067            &mut |l| {
1068                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1069                    match instantiate_fn {
1070                        DebugInstantiate::Building => panic!("network not built"),
1071
1072                        DebugInstantiate::Finalized(finalized) => {
1073                            (finalized.connect_fn.take().unwrap())();
1074                        }
1075                    }
1076                }
1077            },
1078            &mut |n| {
1079                if let HydroNode::Network { instantiate_fn, .. }
1080                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1081                {
1082                    match instantiate_fn {
1083                        DebugInstantiate::Building => panic!("network not built"),
1084
1085                        DebugInstantiate::Finalized(finalized) => {
1086                            (finalized.connect_fn.take().unwrap())();
1087                        }
1088                    }
1089                }
1090            },
1091            seen_tees,
1092            false,
1093        );
1094    }
1095
1096    pub fn transform_bottom_up(
1097        &mut self,
1098        transform_root: &mut impl FnMut(&mut HydroRoot),
1099        transform_node: &mut impl FnMut(&mut HydroNode),
1100        seen_tees: &mut SeenSharedNodes,
1101        check_well_formed: bool,
1102    ) {
1103        self.transform_children(
1104            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1105            seen_tees,
1106        );
1107
1108        transform_root(self);
1109    }
1110
1111    pub fn transform_children(
1112        &mut self,
1113        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1114        seen_tees: &mut SeenSharedNodes,
1115    ) {
1116        match self {
1117            HydroRoot::ForEach { input, .. }
1118            | HydroRoot::SendExternal { input, .. }
1119            | HydroRoot::DestSink { input, .. }
1120            | HydroRoot::CycleSink { input, .. }
1121            | HydroRoot::EmbeddedOutput { input, .. }
1122            | HydroRoot::Null { input, .. } => {
1123                transform(input, seen_tees);
1124            }
1125        }
1126    }
1127
1128    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1129        match self {
1130            HydroRoot::ForEach {
1131                f,
1132                input,
1133                op_metadata,
1134            } => HydroRoot::ForEach {
1135                f: f.clone(),
1136                input: Box::new(input.deep_clone(seen_tees)),
1137                op_metadata: op_metadata.clone(),
1138            },
1139            HydroRoot::SendExternal {
1140                to_external_key,
1141                to_port_id,
1142                to_many,
1143                unpaired,
1144                serialize_fn,
1145                instantiate_fn,
1146                input,
1147                op_metadata,
1148            } => HydroRoot::SendExternal {
1149                to_external_key: *to_external_key,
1150                to_port_id: *to_port_id,
1151                to_many: *to_many,
1152                unpaired: *unpaired,
1153                serialize_fn: serialize_fn.clone(),
1154                instantiate_fn: instantiate_fn.clone(),
1155                input: Box::new(input.deep_clone(seen_tees)),
1156                op_metadata: op_metadata.clone(),
1157            },
1158            HydroRoot::DestSink {
1159                sink,
1160                input,
1161                op_metadata,
1162            } => HydroRoot::DestSink {
1163                sink: sink.clone(),
1164                input: Box::new(input.deep_clone(seen_tees)),
1165                op_metadata: op_metadata.clone(),
1166            },
1167            HydroRoot::CycleSink {
1168                cycle_id,
1169                input,
1170                op_metadata,
1171            } => HydroRoot::CycleSink {
1172                cycle_id: *cycle_id,
1173                input: Box::new(input.deep_clone(seen_tees)),
1174                op_metadata: op_metadata.clone(),
1175            },
1176            HydroRoot::EmbeddedOutput {
1177                ident,
1178                input,
1179                op_metadata,
1180            } => HydroRoot::EmbeddedOutput {
1181                ident: ident.clone(),
1182                input: Box::new(input.deep_clone(seen_tees)),
1183                op_metadata: op_metadata.clone(),
1184            },
1185            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1186                input: Box::new(input.deep_clone(seen_tees)),
1187                op_metadata: op_metadata.clone(),
1188            },
1189        }
1190    }
1191
1192    #[cfg(feature = "build")]
1193    pub fn emit(
1194        &mut self,
1195        graph_builders: &mut dyn DfirBuilder,
1196        seen_tees: &mut SeenSharedNodes,
1197        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1198        next_stmt_id: &mut usize,
1199    ) {
1200        self.emit_core(
1201            &mut BuildersOrCallback::<
1202                fn(&mut HydroRoot, &mut usize),
1203                fn(&mut HydroNode, &mut usize),
1204            >::Builders(graph_builders),
1205            seen_tees,
1206            built_tees,
1207            next_stmt_id,
1208        );
1209    }
1210
1211    #[cfg(feature = "build")]
1212    pub fn emit_core(
1213        &mut self,
1214        builders_or_callback: &mut BuildersOrCallback<
1215            impl FnMut(&mut HydroRoot, &mut usize),
1216            impl FnMut(&mut HydroNode, &mut usize),
1217        >,
1218        seen_tees: &mut SeenSharedNodes,
1219        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1220        next_stmt_id: &mut usize,
1221    ) {
1222        match self {
1223            HydroRoot::ForEach { f, input, .. } => {
1224                let input_ident =
1225                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1226
1227                match builders_or_callback {
1228                    BuildersOrCallback::Builders(graph_builders) => {
1229                        graph_builders
1230                            .get_dfir_mut(&input.metadata().location_id)
1231                            .add_dfir(
1232                                parse_quote! {
1233                                    #input_ident -> for_each(#f);
1234                                },
1235                                None,
1236                                Some(&next_stmt_id.to_string()),
1237                            );
1238                    }
1239                    BuildersOrCallback::Callback(leaf_callback, _) => {
1240                        leaf_callback(self, next_stmt_id);
1241                    }
1242                }
1243
1244                *next_stmt_id += 1;
1245            }
1246
1247            HydroRoot::SendExternal {
1248                serialize_fn,
1249                instantiate_fn,
1250                input,
1251                ..
1252            } => {
1253                let input_ident =
1254                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1255
1256                match builders_or_callback {
1257                    BuildersOrCallback::Builders(graph_builders) => {
1258                        let (sink_expr, _) = match instantiate_fn {
1259                            DebugInstantiate::Building => (
1260                                syn::parse_quote!(DUMMY_SINK),
1261                                syn::parse_quote!(DUMMY_SOURCE),
1262                            ),
1263
1264                            DebugInstantiate::Finalized(finalized) => {
1265                                (finalized.sink.clone(), finalized.source.clone())
1266                            }
1267                        };
1268
1269                        graph_builders.create_external_output(
1270                            &input.metadata().location_id,
1271                            sink_expr,
1272                            &input_ident,
1273                            serialize_fn.as_ref(),
1274                            *next_stmt_id,
1275                        );
1276                    }
1277                    BuildersOrCallback::Callback(leaf_callback, _) => {
1278                        leaf_callback(self, next_stmt_id);
1279                    }
1280                }
1281
1282                *next_stmt_id += 1;
1283            }
1284
1285            HydroRoot::DestSink { sink, input, .. } => {
1286                let input_ident =
1287                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1288
1289                match builders_or_callback {
1290                    BuildersOrCallback::Builders(graph_builders) => {
1291                        graph_builders
1292                            .get_dfir_mut(&input.metadata().location_id)
1293                            .add_dfir(
1294                                parse_quote! {
1295                                    #input_ident -> dest_sink(#sink);
1296                                },
1297                                None,
1298                                Some(&next_stmt_id.to_string()),
1299                            );
1300                    }
1301                    BuildersOrCallback::Callback(leaf_callback, _) => {
1302                        leaf_callback(self, next_stmt_id);
1303                    }
1304                }
1305
1306                *next_stmt_id += 1;
1307            }
1308
1309            HydroRoot::CycleSink {
1310                cycle_id, input, ..
1311            } => {
1312                let input_ident =
1313                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1314
1315                match builders_or_callback {
1316                    BuildersOrCallback::Builders(graph_builders) => {
1317                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1318                            CollectionKind::KeyedSingleton {
1319                                key_type,
1320                                value_type,
1321                                ..
1322                            }
1323                            | CollectionKind::KeyedStream {
1324                                key_type,
1325                                value_type,
1326                                ..
1327                            } => {
1328                                parse_quote!((#key_type, #value_type))
1329                            }
1330                            CollectionKind::Stream { element_type, .. }
1331                            | CollectionKind::Singleton { element_type, .. }
1332                            | CollectionKind::Optional { element_type, .. } => {
1333                                parse_quote!(#element_type)
1334                            }
1335                        };
1336
1337                        let cycle_id_ident = cycle_id.as_ident();
1338                        graph_builders
1339                            .get_dfir_mut(&input.metadata().location_id)
1340                            .add_dfir(
1341                                parse_quote! {
1342                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1343                                },
1344                                None,
1345                                None,
1346                            );
1347                    }
1348                    // No ID, no callback
1349                    BuildersOrCallback::Callback(_, _) => {}
1350                }
1351            }
1352
1353            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1354                let input_ident =
1355                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357                match builders_or_callback {
1358                    BuildersOrCallback::Builders(graph_builders) => {
1359                        graph_builders
1360                            .get_dfir_mut(&input.metadata().location_id)
1361                            .add_dfir(
1362                                parse_quote! {
1363                                    #input_ident -> for_each(&mut #ident);
1364                                },
1365                                None,
1366                                Some(&next_stmt_id.to_string()),
1367                            );
1368                    }
1369                    BuildersOrCallback::Callback(leaf_callback, _) => {
1370                        leaf_callback(self, next_stmt_id);
1371                    }
1372                }
1373
1374                *next_stmt_id += 1;
1375            }
1376
1377            HydroRoot::Null { input, .. } => {
1378                let input_ident =
1379                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1380
1381                match builders_or_callback {
1382                    BuildersOrCallback::Builders(graph_builders) => {
1383                        graph_builders
1384                            .get_dfir_mut(&input.metadata().location_id)
1385                            .add_dfir(
1386                                parse_quote! {
1387                                    #input_ident -> for_each(|_| {});
1388                                },
1389                                None,
1390                                Some(&next_stmt_id.to_string()),
1391                            );
1392                    }
1393                    BuildersOrCallback::Callback(leaf_callback, _) => {
1394                        leaf_callback(self, next_stmt_id);
1395                    }
1396                }
1397
1398                *next_stmt_id += 1;
1399            }
1400        }
1401    }
1402
1403    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1404        match self {
1405            HydroRoot::ForEach { op_metadata, .. }
1406            | HydroRoot::SendExternal { op_metadata, .. }
1407            | HydroRoot::DestSink { op_metadata, .. }
1408            | HydroRoot::CycleSink { op_metadata, .. }
1409            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1410            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1411        }
1412    }
1413
1414    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1415        match self {
1416            HydroRoot::ForEach { op_metadata, .. }
1417            | HydroRoot::SendExternal { op_metadata, .. }
1418            | HydroRoot::DestSink { op_metadata, .. }
1419            | HydroRoot::CycleSink { op_metadata, .. }
1420            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1421            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1422        }
1423    }
1424
1425    pub fn input(&self) -> &HydroNode {
1426        match self {
1427            HydroRoot::ForEach { input, .. }
1428            | HydroRoot::SendExternal { input, .. }
1429            | HydroRoot::DestSink { input, .. }
1430            | HydroRoot::CycleSink { input, .. }
1431            | HydroRoot::EmbeddedOutput { input, .. }
1432            | HydroRoot::Null { input, .. } => input,
1433        }
1434    }
1435
1436    pub fn input_metadata(&self) -> &HydroIrMetadata {
1437        self.input().metadata()
1438    }
1439
1440    pub fn print_root(&self) -> String {
1441        match self {
1442            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1443            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1444            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1445            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1446            HydroRoot::EmbeddedOutput { ident, .. } => {
1447                format!("EmbeddedOutput({})", ident)
1448            }
1449            HydroRoot::Null { .. } => "Null".to_owned(),
1450        }
1451    }
1452
1453    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1454        match self {
1455            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1456                transform(f);
1457            }
1458            HydroRoot::SendExternal { .. }
1459            | HydroRoot::CycleSink { .. }
1460            | HydroRoot::EmbeddedOutput { .. }
1461            | HydroRoot::Null { .. } => {}
1462        }
1463    }
1464}
1465
1466#[cfg(feature = "build")]
1467fn tick_of(loc: &LocationId) -> Option<ClockId> {
1468    match loc {
1469        LocationId::Tick(id, _) => Some(*id),
1470        LocationId::Atomic(inner) => tick_of(inner),
1471        _ => None,
1472    }
1473}
1474
1475#[cfg(feature = "build")]
1476fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1477    match loc {
1478        LocationId::Tick(id, inner) => {
1479            *id = uf_find(uf, *id);
1480            remap_location(inner, uf);
1481        }
1482        LocationId::Atomic(inner) => {
1483            remap_location(inner, uf);
1484        }
1485        LocationId::Process(_) | LocationId::Cluster(_) => {}
1486    }
1487}
1488
1489#[cfg(feature = "build")]
1490fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1491    let p = *parent.get(&x).unwrap_or(&x);
1492    if p == x {
1493        return x;
1494    }
1495    let root = uf_find(parent, p);
1496    parent.insert(x, root);
1497    root
1498}
1499
1500#[cfg(feature = "build")]
1501fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1502    let ra = uf_find(parent, a);
1503    let rb = uf_find(parent, b);
1504    if ra != rb {
1505        parent.insert(ra, rb);
1506    }
1507}
1508
1509/// Traverse the IR to build a union-find that unifies tick IDs connected
1510/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1511/// rewrite all `LocationId`s to use the representative tick ID.
1512#[cfg(feature = "build")]
1513pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1514    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1515
1516    // Pass 1: collect unifications.
1517    transform_bottom_up(
1518        ir,
1519        &mut |_| {},
1520        &mut |node: &mut HydroNode| {
1521            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1522                node
1523                && let (Some(a), Some(b)) = (
1524                    tick_of(&inner.metadata().location_id),
1525                    tick_of(&metadata.location_id),
1526                )
1527            {
1528                uf_union(&mut uf, a, b);
1529            }
1530        },
1531        false,
1532    );
1533
1534    // Pass 2: rewrite all LocationIds.
1535    transform_bottom_up(
1536        ir,
1537        &mut |_| {},
1538        &mut |node: &mut HydroNode| {
1539            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1540        },
1541        false,
1542    );
1543}
1544
1545#[cfg(feature = "build")]
1546pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1547    let mut builders = SecondaryMap::new();
1548    let mut seen_tees = HashMap::new();
1549    let mut built_tees = HashMap::new();
1550    let mut next_stmt_id = 0;
1551    for leaf in ir {
1552        leaf.emit(
1553            &mut builders,
1554            &mut seen_tees,
1555            &mut built_tees,
1556            &mut next_stmt_id,
1557        );
1558    }
1559    builders
1560}
1561
1562#[cfg(feature = "build")]
1563pub fn traverse_dfir(
1564    ir: &mut [HydroRoot],
1565    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1566    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1567) {
1568    let mut seen_tees = HashMap::new();
1569    let mut built_tees = HashMap::new();
1570    let mut next_stmt_id = 0;
1571    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1572    ir.iter_mut().for_each(|leaf| {
1573        leaf.emit_core(
1574            &mut callback,
1575            &mut seen_tees,
1576            &mut built_tees,
1577            &mut next_stmt_id,
1578        );
1579    });
1580}
1581
1582pub fn transform_bottom_up(
1583    ir: &mut [HydroRoot],
1584    transform_root: &mut impl FnMut(&mut HydroRoot),
1585    transform_node: &mut impl FnMut(&mut HydroNode),
1586    check_well_formed: bool,
1587) {
1588    let mut seen_tees = HashMap::new();
1589    ir.iter_mut().for_each(|leaf| {
1590        leaf.transform_bottom_up(
1591            transform_root,
1592            transform_node,
1593            &mut seen_tees,
1594            check_well_formed,
1595        );
1596    });
1597}
1598
1599pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1600    let mut seen_tees = HashMap::new();
1601    ir.iter()
1602        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1603        .collect()
1604}
1605
1606type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1607thread_local! {
1608    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1609}
1610
1611pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1612    PRINTED_TEES.with(|printed_tees| {
1613        let mut printed_tees_mut = printed_tees.borrow_mut();
1614        *printed_tees_mut = Some((0, HashMap::new()));
1615        drop(printed_tees_mut);
1616
1617        let ret = f();
1618
1619        let mut printed_tees_mut = printed_tees.borrow_mut();
1620        *printed_tees_mut = None;
1621
1622        ret
1623    })
1624}
1625
1626pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1627
1628impl SharedNode {
1629    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1630        Rc::as_ptr(&self.0)
1631    }
1632}
1633
1634impl Debug for SharedNode {
1635    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636        PRINTED_TEES.with(|printed_tees| {
1637            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1638            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1639
1640            if let Some(printed_tees_mut) = printed_tees_mut {
1641                if let Some(existing) = printed_tees_mut
1642                    .1
1643                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1644                {
1645                    write!(f, "<shared {}>", existing)
1646                } else {
1647                    let next_id = printed_tees_mut.0;
1648                    printed_tees_mut.0 += 1;
1649                    printed_tees_mut
1650                        .1
1651                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1652                    drop(printed_tees_mut_borrow);
1653                    write!(f, "<shared {}>: ", next_id)?;
1654                    Debug::fmt(&self.0.borrow(), f)
1655                }
1656            } else {
1657                drop(printed_tees_mut_borrow);
1658                write!(f, "<shared>: ")?;
1659                Debug::fmt(&self.0.borrow(), f)
1660            }
1661        })
1662    }
1663}
1664
1665impl Hash for SharedNode {
1666    fn hash<H: Hasher>(&self, state: &mut H) {
1667        self.0.borrow_mut().hash(state);
1668    }
1669}
1670
1671#[derive(Clone, PartialEq, Eq, Debug)]
1672pub enum BoundKind {
1673    Unbounded,
1674    Bounded,
1675}
1676
1677#[derive(Clone, PartialEq, Eq, Debug)]
1678pub enum StreamOrder {
1679    NoOrder,
1680    TotalOrder,
1681}
1682
1683#[derive(Clone, PartialEq, Eq, Debug)]
1684pub enum StreamRetry {
1685    AtLeastOnce,
1686    ExactlyOnce,
1687}
1688
1689#[derive(Clone, PartialEq, Eq, Debug)]
1690pub enum KeyedSingletonBoundKind {
1691    Unbounded,
1692    MonotonicValue,
1693    BoundedValue,
1694    Bounded,
1695}
1696
1697#[derive(Clone, PartialEq, Eq, Debug)]
1698pub enum SingletonBoundKind {
1699    Unbounded,
1700    Monotonic,
1701    Bounded,
1702}
1703
1704#[derive(Clone, PartialEq, Eq, Debug)]
1705pub enum CollectionKind {
1706    Stream {
1707        bound: BoundKind,
1708        order: StreamOrder,
1709        retry: StreamRetry,
1710        element_type: DebugType,
1711    },
1712    Singleton {
1713        bound: SingletonBoundKind,
1714        element_type: DebugType,
1715    },
1716    Optional {
1717        bound: BoundKind,
1718        element_type: DebugType,
1719    },
1720    KeyedStream {
1721        bound: BoundKind,
1722        value_order: StreamOrder,
1723        value_retry: StreamRetry,
1724        key_type: DebugType,
1725        value_type: DebugType,
1726    },
1727    KeyedSingleton {
1728        bound: KeyedSingletonBoundKind,
1729        key_type: DebugType,
1730        value_type: DebugType,
1731    },
1732}
1733
1734impl CollectionKind {
1735    pub fn is_bounded(&self) -> bool {
1736        matches!(
1737            self,
1738            CollectionKind::Stream {
1739                bound: BoundKind::Bounded,
1740                ..
1741            } | CollectionKind::Singleton {
1742                bound: SingletonBoundKind::Bounded,
1743                ..
1744            } | CollectionKind::Optional {
1745                bound: BoundKind::Bounded,
1746                ..
1747            } | CollectionKind::KeyedStream {
1748                bound: BoundKind::Bounded,
1749                ..
1750            } | CollectionKind::KeyedSingleton {
1751                bound: KeyedSingletonBoundKind::Bounded,
1752                ..
1753            }
1754        )
1755    }
1756}
1757
1758#[derive(Clone)]
1759pub struct HydroIrMetadata {
1760    pub location_id: LocationId,
1761    pub collection_kind: CollectionKind,
1762    pub cardinality: Option<usize>,
1763    pub tag: Option<String>,
1764    pub op: HydroIrOpMetadata,
1765}
1766
1767// HydroIrMetadata shouldn't be used to hash or compare
1768impl Hash for HydroIrMetadata {
1769    fn hash<H: Hasher>(&self, _: &mut H) {}
1770}
1771
1772impl PartialEq for HydroIrMetadata {
1773    fn eq(&self, _: &Self) -> bool {
1774        true
1775    }
1776}
1777
1778impl Eq for HydroIrMetadata {}
1779
1780impl Debug for HydroIrMetadata {
1781    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1782        f.debug_struct("HydroIrMetadata")
1783            .field("location_id", &self.location_id)
1784            .field("collection_kind", &self.collection_kind)
1785            .finish()
1786    }
1787}
1788
1789/// Metadata that is specific to the operator itself, rather than its outputs.
1790/// This is available on _both_ inner nodes and roots.
1791#[derive(Clone)]
1792pub struct HydroIrOpMetadata {
1793    pub backtrace: Backtrace,
1794    pub cpu_usage: Option<f64>,
1795    pub network_recv_cpu_usage: Option<f64>,
1796    pub id: Option<usize>,
1797}
1798
1799impl HydroIrOpMetadata {
1800    #[expect(
1801        clippy::new_without_default,
1802        reason = "explicit calls to new ensure correct backtrace bounds"
1803    )]
1804    pub fn new() -> HydroIrOpMetadata {
1805        Self::new_with_skip(1)
1806    }
1807
1808    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1809        HydroIrOpMetadata {
1810            backtrace: Backtrace::get_backtrace(2 + skip_count),
1811            cpu_usage: None,
1812            network_recv_cpu_usage: None,
1813            id: None,
1814        }
1815    }
1816}
1817
1818impl Debug for HydroIrOpMetadata {
1819    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1820        f.debug_struct("HydroIrOpMetadata").finish()
1821    }
1822}
1823
1824impl Hash for HydroIrOpMetadata {
1825    fn hash<H: Hasher>(&self, _: &mut H) {}
1826}
1827
1828/// An intermediate node in a Hydro graph, which consumes data
1829/// from upstream nodes and emits data to downstream nodes.
1830#[derive(Debug, Hash)]
1831pub enum HydroNode {
1832    Placeholder,
1833
1834    /// Manually "casts" between two different collection kinds.
1835    ///
1836    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1837    /// correctness checks. In particular, the user must ensure that every possible
1838    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1839    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1840    /// collection. This ensures that the simulator does not miss any possible outputs.
1841    Cast {
1842        inner: Box<HydroNode>,
1843        metadata: HydroIrMetadata,
1844    },
1845
1846    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1847    /// interpretation of the input stream.
1848    ///
1849    /// In production, this simply passes through the input, but in simulation, this operator
1850    /// explicitly selects a randomized interpretation.
1851    ObserveNonDet {
1852        inner: Box<HydroNode>,
1853        trusted: bool, // if true, we do not need to simulate non-determinism
1854        metadata: HydroIrMetadata,
1855    },
1856
1857    Source {
1858        source: HydroSource,
1859        metadata: HydroIrMetadata,
1860    },
1861
1862    SingletonSource {
1863        value: DebugExpr,
1864        first_tick_only: bool,
1865        metadata: HydroIrMetadata,
1866    },
1867
1868    CycleSource {
1869        cycle_id: CycleId,
1870        metadata: HydroIrMetadata,
1871    },
1872
1873    Tee {
1874        inner: SharedNode,
1875        metadata: HydroIrMetadata,
1876    },
1877
1878    Partition {
1879        inner: SharedNode,
1880        f: DebugExpr,
1881        is_true: bool,
1882        metadata: HydroIrMetadata,
1883    },
1884
1885    BeginAtomic {
1886        inner: Box<HydroNode>,
1887        metadata: HydroIrMetadata,
1888    },
1889
1890    EndAtomic {
1891        inner: Box<HydroNode>,
1892        metadata: HydroIrMetadata,
1893    },
1894
1895    Batch {
1896        inner: Box<HydroNode>,
1897        metadata: HydroIrMetadata,
1898    },
1899
1900    YieldConcat {
1901        inner: Box<HydroNode>,
1902        metadata: HydroIrMetadata,
1903    },
1904
1905    Chain {
1906        first: Box<HydroNode>,
1907        second: Box<HydroNode>,
1908        metadata: HydroIrMetadata,
1909    },
1910
1911    ChainFirst {
1912        first: Box<HydroNode>,
1913        second: Box<HydroNode>,
1914        metadata: HydroIrMetadata,
1915    },
1916
1917    CrossProduct {
1918        left: Box<HydroNode>,
1919        right: Box<HydroNode>,
1920        metadata: HydroIrMetadata,
1921    },
1922
1923    CrossSingleton {
1924        left: Box<HydroNode>,
1925        right: Box<HydroNode>,
1926        metadata: HydroIrMetadata,
1927    },
1928
1929    Join {
1930        left: Box<HydroNode>,
1931        right: Box<HydroNode>,
1932        metadata: HydroIrMetadata,
1933    },
1934
1935    Difference {
1936        pos: Box<HydroNode>,
1937        neg: Box<HydroNode>,
1938        metadata: HydroIrMetadata,
1939    },
1940
1941    AntiJoin {
1942        pos: Box<HydroNode>,
1943        neg: Box<HydroNode>,
1944        metadata: HydroIrMetadata,
1945    },
1946
1947    ResolveFutures {
1948        input: Box<HydroNode>,
1949        metadata: HydroIrMetadata,
1950    },
1951    ResolveFuturesBlocking {
1952        input: Box<HydroNode>,
1953        metadata: HydroIrMetadata,
1954    },
1955    ResolveFuturesOrdered {
1956        input: Box<HydroNode>,
1957        metadata: HydroIrMetadata,
1958    },
1959
1960    Map {
1961        f: DebugExpr,
1962        input: Box<HydroNode>,
1963        metadata: HydroIrMetadata,
1964    },
1965    FlatMap {
1966        f: DebugExpr,
1967        input: Box<HydroNode>,
1968        metadata: HydroIrMetadata,
1969    },
1970    FlatMapStreamBlocking {
1971        f: DebugExpr,
1972        input: Box<HydroNode>,
1973        metadata: HydroIrMetadata,
1974    },
1975    Filter {
1976        f: DebugExpr,
1977        input: Box<HydroNode>,
1978        metadata: HydroIrMetadata,
1979    },
1980    FilterMap {
1981        f: DebugExpr,
1982        input: Box<HydroNode>,
1983        metadata: HydroIrMetadata,
1984    },
1985
1986    DeferTick {
1987        input: Box<HydroNode>,
1988        metadata: HydroIrMetadata,
1989    },
1990    Enumerate {
1991        input: Box<HydroNode>,
1992        metadata: HydroIrMetadata,
1993    },
1994    Inspect {
1995        f: DebugExpr,
1996        input: Box<HydroNode>,
1997        metadata: HydroIrMetadata,
1998    },
1999
2000    Unique {
2001        input: Box<HydroNode>,
2002        metadata: HydroIrMetadata,
2003    },
2004
2005    Sort {
2006        input: Box<HydroNode>,
2007        metadata: HydroIrMetadata,
2008    },
2009    Fold {
2010        init: DebugExpr,
2011        acc: DebugExpr,
2012        input: Box<HydroNode>,
2013        metadata: HydroIrMetadata,
2014    },
2015
2016    Scan {
2017        init: DebugExpr,
2018        acc: DebugExpr,
2019        input: Box<HydroNode>,
2020        metadata: HydroIrMetadata,
2021    },
2022    ScanAsyncBlocking {
2023        init: DebugExpr,
2024        acc: DebugExpr,
2025        input: Box<HydroNode>,
2026        metadata: HydroIrMetadata,
2027    },
2028    FoldKeyed {
2029        init: DebugExpr,
2030        acc: DebugExpr,
2031        input: Box<HydroNode>,
2032        metadata: HydroIrMetadata,
2033    },
2034
2035    Reduce {
2036        f: DebugExpr,
2037        input: Box<HydroNode>,
2038        metadata: HydroIrMetadata,
2039    },
2040    ReduceKeyed {
2041        f: DebugExpr,
2042        input: Box<HydroNode>,
2043        metadata: HydroIrMetadata,
2044    },
2045    ReduceKeyedWatermark {
2046        f: DebugExpr,
2047        input: Box<HydroNode>,
2048        watermark: Box<HydroNode>,
2049        metadata: HydroIrMetadata,
2050    },
2051
2052    Network {
2053        name: Option<String>,
2054        networking_info: crate::networking::NetworkingInfo,
2055        serialize_fn: Option<DebugExpr>,
2056        instantiate_fn: DebugInstantiate,
2057        deserialize_fn: Option<DebugExpr>,
2058        input: Box<HydroNode>,
2059        metadata: HydroIrMetadata,
2060    },
2061
2062    ExternalInput {
2063        from_external_key: LocationKey,
2064        from_port_id: ExternalPortId,
2065        from_many: bool,
2066        codec_type: DebugType,
2067        port_hint: NetworkHint,
2068        instantiate_fn: DebugInstantiate,
2069        deserialize_fn: Option<DebugExpr>,
2070        metadata: HydroIrMetadata,
2071    },
2072
2073    Counter {
2074        tag: String,
2075        duration: DebugExpr,
2076        prefix: String,
2077        input: Box<HydroNode>,
2078        metadata: HydroIrMetadata,
2079    },
2080}
2081
2082pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2083pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2084
2085impl HydroNode {
2086    pub fn transform_bottom_up(
2087        &mut self,
2088        transform: &mut impl FnMut(&mut HydroNode),
2089        seen_tees: &mut SeenSharedNodes,
2090        check_well_formed: bool,
2091    ) {
2092        self.transform_children(
2093            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2094            seen_tees,
2095        );
2096
2097        transform(self);
2098
2099        let self_location = self.metadata().location_id.root();
2100
2101        if check_well_formed {
2102            match &*self {
2103                HydroNode::Network { .. } => {}
2104                _ => {
2105                    self.input_metadata().iter().for_each(|i| {
2106                        if i.location_id.root() != self_location {
2107                            panic!(
2108                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2109                                i,
2110                                i.location_id.root(),
2111                                self,
2112                                self_location
2113                            )
2114                        }
2115                    });
2116                }
2117            }
2118        }
2119    }
2120
2121    #[inline(always)]
2122    pub fn transform_children(
2123        &mut self,
2124        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2125        seen_tees: &mut SeenSharedNodes,
2126    ) {
2127        match self {
2128            HydroNode::Placeholder => {
2129                panic!();
2130            }
2131
2132            HydroNode::Source { .. }
2133            | HydroNode::SingletonSource { .. }
2134            | HydroNode::CycleSource { .. }
2135            | HydroNode::ExternalInput { .. } => {}
2136
2137            HydroNode::Tee { inner, .. } => {
2138                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2139                    *inner = SharedNode(transformed.clone());
2140                } else {
2141                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2142                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2143                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2144                    transform(&mut orig, seen_tees);
2145                    *transformed_cell.borrow_mut() = orig;
2146                    *inner = SharedNode(transformed_cell);
2147                }
2148            }
2149
2150            HydroNode::Partition { inner, .. } => {
2151                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2152                    *inner = SharedNode(transformed.clone());
2153                } else {
2154                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2155                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2156                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2157                    transform(&mut orig, seen_tees);
2158                    *transformed_cell.borrow_mut() = orig;
2159                    *inner = SharedNode(transformed_cell);
2160                }
2161            }
2162
2163            HydroNode::Cast { inner, .. }
2164            | HydroNode::ObserveNonDet { inner, .. }
2165            | HydroNode::BeginAtomic { inner, .. }
2166            | HydroNode::EndAtomic { inner, .. }
2167            | HydroNode::Batch { inner, .. }
2168            | HydroNode::YieldConcat { inner, .. } => {
2169                transform(inner.as_mut(), seen_tees);
2170            }
2171
2172            HydroNode::Chain { first, second, .. } => {
2173                transform(first.as_mut(), seen_tees);
2174                transform(second.as_mut(), seen_tees);
2175            }
2176
2177            HydroNode::ChainFirst { first, second, .. } => {
2178                transform(first.as_mut(), seen_tees);
2179                transform(second.as_mut(), seen_tees);
2180            }
2181
2182            HydroNode::CrossSingleton { left, right, .. }
2183            | HydroNode::CrossProduct { left, right, .. }
2184            | HydroNode::Join { left, right, .. } => {
2185                transform(left.as_mut(), seen_tees);
2186                transform(right.as_mut(), seen_tees);
2187            }
2188
2189            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2190                transform(pos.as_mut(), seen_tees);
2191                transform(neg.as_mut(), seen_tees);
2192            }
2193
2194            HydroNode::ReduceKeyedWatermark {
2195                input, watermark, ..
2196            } => {
2197                transform(input.as_mut(), seen_tees);
2198                transform(watermark.as_mut(), seen_tees);
2199            }
2200
2201            HydroNode::Map { input, .. }
2202            | HydroNode::ResolveFutures { input, .. }
2203            | HydroNode::ResolveFuturesBlocking { input, .. }
2204            | HydroNode::ResolveFuturesOrdered { input, .. }
2205            | HydroNode::FlatMap { input, .. }
2206            | HydroNode::FlatMapStreamBlocking { input, .. }
2207            | HydroNode::Filter { input, .. }
2208            | HydroNode::FilterMap { input, .. }
2209            | HydroNode::Sort { input, .. }
2210            | HydroNode::DeferTick { input, .. }
2211            | HydroNode::Enumerate { input, .. }
2212            | HydroNode::Inspect { input, .. }
2213            | HydroNode::Unique { input, .. }
2214            | HydroNode::Network { input, .. }
2215            | HydroNode::Fold { input, .. }
2216            | HydroNode::Scan { input, .. }
2217            | HydroNode::ScanAsyncBlocking { input, .. }
2218            | HydroNode::FoldKeyed { input, .. }
2219            | HydroNode::Reduce { input, .. }
2220            | HydroNode::ReduceKeyed { input, .. }
2221            | HydroNode::Counter { input, .. } => {
2222                transform(input.as_mut(), seen_tees);
2223            }
2224        }
2225    }
2226
2227    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2228        match self {
2229            HydroNode::Placeholder => HydroNode::Placeholder,
2230            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2231                inner: Box::new(inner.deep_clone(seen_tees)),
2232                metadata: metadata.clone(),
2233            },
2234            HydroNode::ObserveNonDet {
2235                inner,
2236                trusted,
2237                metadata,
2238            } => HydroNode::ObserveNonDet {
2239                inner: Box::new(inner.deep_clone(seen_tees)),
2240                trusted: *trusted,
2241                metadata: metadata.clone(),
2242            },
2243            HydroNode::Source { source, metadata } => HydroNode::Source {
2244                source: source.clone(),
2245                metadata: metadata.clone(),
2246            },
2247            HydroNode::SingletonSource {
2248                value,
2249                first_tick_only,
2250                metadata,
2251            } => HydroNode::SingletonSource {
2252                value: value.clone(),
2253                first_tick_only: *first_tick_only,
2254                metadata: metadata.clone(),
2255            },
2256            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2257                cycle_id: *cycle_id,
2258                metadata: metadata.clone(),
2259            },
2260            HydroNode::Tee { inner, metadata } => {
2261                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2262                    HydroNode::Tee {
2263                        inner: SharedNode(transformed.clone()),
2264                        metadata: metadata.clone(),
2265                    }
2266                } else {
2267                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2268                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2269                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2270                    *new_rc.borrow_mut() = cloned;
2271                    HydroNode::Tee {
2272                        inner: SharedNode(new_rc),
2273                        metadata: metadata.clone(),
2274                    }
2275                }
2276            }
2277            HydroNode::Partition {
2278                inner,
2279                f,
2280                is_true,
2281                metadata,
2282            } => {
2283                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2284                    HydroNode::Partition {
2285                        inner: SharedNode(transformed.clone()),
2286                        f: f.clone(),
2287                        is_true: *is_true,
2288                        metadata: metadata.clone(),
2289                    }
2290                } else {
2291                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2292                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2293                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2294                    *new_rc.borrow_mut() = cloned;
2295                    HydroNode::Partition {
2296                        inner: SharedNode(new_rc),
2297                        f: f.clone(),
2298                        is_true: *is_true,
2299                        metadata: metadata.clone(),
2300                    }
2301                }
2302            }
2303            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2304                inner: Box::new(inner.deep_clone(seen_tees)),
2305                metadata: metadata.clone(),
2306            },
2307            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2308                inner: Box::new(inner.deep_clone(seen_tees)),
2309                metadata: metadata.clone(),
2310            },
2311            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2312                inner: Box::new(inner.deep_clone(seen_tees)),
2313                metadata: metadata.clone(),
2314            },
2315            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2316                inner: Box::new(inner.deep_clone(seen_tees)),
2317                metadata: metadata.clone(),
2318            },
2319            HydroNode::Chain {
2320                first,
2321                second,
2322                metadata,
2323            } => HydroNode::Chain {
2324                first: Box::new(first.deep_clone(seen_tees)),
2325                second: Box::new(second.deep_clone(seen_tees)),
2326                metadata: metadata.clone(),
2327            },
2328            HydroNode::ChainFirst {
2329                first,
2330                second,
2331                metadata,
2332            } => HydroNode::ChainFirst {
2333                first: Box::new(first.deep_clone(seen_tees)),
2334                second: Box::new(second.deep_clone(seen_tees)),
2335                metadata: metadata.clone(),
2336            },
2337            HydroNode::CrossProduct {
2338                left,
2339                right,
2340                metadata,
2341            } => HydroNode::CrossProduct {
2342                left: Box::new(left.deep_clone(seen_tees)),
2343                right: Box::new(right.deep_clone(seen_tees)),
2344                metadata: metadata.clone(),
2345            },
2346            HydroNode::CrossSingleton {
2347                left,
2348                right,
2349                metadata,
2350            } => HydroNode::CrossSingleton {
2351                left: Box::new(left.deep_clone(seen_tees)),
2352                right: Box::new(right.deep_clone(seen_tees)),
2353                metadata: metadata.clone(),
2354            },
2355            HydroNode::Join {
2356                left,
2357                right,
2358                metadata,
2359            } => HydroNode::Join {
2360                left: Box::new(left.deep_clone(seen_tees)),
2361                right: Box::new(right.deep_clone(seen_tees)),
2362                metadata: metadata.clone(),
2363            },
2364            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2365                pos: Box::new(pos.deep_clone(seen_tees)),
2366                neg: Box::new(neg.deep_clone(seen_tees)),
2367                metadata: metadata.clone(),
2368            },
2369            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2370                pos: Box::new(pos.deep_clone(seen_tees)),
2371                neg: Box::new(neg.deep_clone(seen_tees)),
2372                metadata: metadata.clone(),
2373            },
2374            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2375                input: Box::new(input.deep_clone(seen_tees)),
2376                metadata: metadata.clone(),
2377            },
2378            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2379                HydroNode::ResolveFuturesBlocking {
2380                    input: Box::new(input.deep_clone(seen_tees)),
2381                    metadata: metadata.clone(),
2382                }
2383            }
2384            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2385                HydroNode::ResolveFuturesOrdered {
2386                    input: Box::new(input.deep_clone(seen_tees)),
2387                    metadata: metadata.clone(),
2388                }
2389            }
2390            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2391                f: f.clone(),
2392                input: Box::new(input.deep_clone(seen_tees)),
2393                metadata: metadata.clone(),
2394            },
2395            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2396                f: f.clone(),
2397                input: Box::new(input.deep_clone(seen_tees)),
2398                metadata: metadata.clone(),
2399            },
2400            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2401                HydroNode::FlatMapStreamBlocking {
2402                    f: f.clone(),
2403                    input: Box::new(input.deep_clone(seen_tees)),
2404                    metadata: metadata.clone(),
2405                }
2406            }
2407            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2408                f: f.clone(),
2409                input: Box::new(input.deep_clone(seen_tees)),
2410                metadata: metadata.clone(),
2411            },
2412            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2413                f: f.clone(),
2414                input: Box::new(input.deep_clone(seen_tees)),
2415                metadata: metadata.clone(),
2416            },
2417            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2418                input: Box::new(input.deep_clone(seen_tees)),
2419                metadata: metadata.clone(),
2420            },
2421            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2422                input: Box::new(input.deep_clone(seen_tees)),
2423                metadata: metadata.clone(),
2424            },
2425            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2426                f: f.clone(),
2427                input: Box::new(input.deep_clone(seen_tees)),
2428                metadata: metadata.clone(),
2429            },
2430            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2431                input: Box::new(input.deep_clone(seen_tees)),
2432                metadata: metadata.clone(),
2433            },
2434            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2435                input: Box::new(input.deep_clone(seen_tees)),
2436                metadata: metadata.clone(),
2437            },
2438            HydroNode::Fold {
2439                init,
2440                acc,
2441                input,
2442                metadata,
2443            } => HydroNode::Fold {
2444                init: init.clone(),
2445                acc: acc.clone(),
2446                input: Box::new(input.deep_clone(seen_tees)),
2447                metadata: metadata.clone(),
2448            },
2449            HydroNode::Scan {
2450                init,
2451                acc,
2452                input,
2453                metadata,
2454            } => HydroNode::Scan {
2455                init: init.clone(),
2456                acc: acc.clone(),
2457                input: Box::new(input.deep_clone(seen_tees)),
2458                metadata: metadata.clone(),
2459            },
2460            HydroNode::ScanAsyncBlocking {
2461                init,
2462                acc,
2463                input,
2464                metadata,
2465            } => HydroNode::ScanAsyncBlocking {
2466                init: init.clone(),
2467                acc: acc.clone(),
2468                input: Box::new(input.deep_clone(seen_tees)),
2469                metadata: metadata.clone(),
2470            },
2471            HydroNode::FoldKeyed {
2472                init,
2473                acc,
2474                input,
2475                metadata,
2476            } => HydroNode::FoldKeyed {
2477                init: init.clone(),
2478                acc: acc.clone(),
2479                input: Box::new(input.deep_clone(seen_tees)),
2480                metadata: metadata.clone(),
2481            },
2482            HydroNode::ReduceKeyedWatermark {
2483                f,
2484                input,
2485                watermark,
2486                metadata,
2487            } => HydroNode::ReduceKeyedWatermark {
2488                f: f.clone(),
2489                input: Box::new(input.deep_clone(seen_tees)),
2490                watermark: Box::new(watermark.deep_clone(seen_tees)),
2491                metadata: metadata.clone(),
2492            },
2493            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2494                f: f.clone(),
2495                input: Box::new(input.deep_clone(seen_tees)),
2496                metadata: metadata.clone(),
2497            },
2498            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2499                f: f.clone(),
2500                input: Box::new(input.deep_clone(seen_tees)),
2501                metadata: metadata.clone(),
2502            },
2503            HydroNode::Network {
2504                name,
2505                networking_info,
2506                serialize_fn,
2507                instantiate_fn,
2508                deserialize_fn,
2509                input,
2510                metadata,
2511            } => HydroNode::Network {
2512                name: name.clone(),
2513                networking_info: networking_info.clone(),
2514                serialize_fn: serialize_fn.clone(),
2515                instantiate_fn: instantiate_fn.clone(),
2516                deserialize_fn: deserialize_fn.clone(),
2517                input: Box::new(input.deep_clone(seen_tees)),
2518                metadata: metadata.clone(),
2519            },
2520            HydroNode::ExternalInput {
2521                from_external_key,
2522                from_port_id,
2523                from_many,
2524                codec_type,
2525                port_hint,
2526                instantiate_fn,
2527                deserialize_fn,
2528                metadata,
2529            } => HydroNode::ExternalInput {
2530                from_external_key: *from_external_key,
2531                from_port_id: *from_port_id,
2532                from_many: *from_many,
2533                codec_type: codec_type.clone(),
2534                port_hint: *port_hint,
2535                instantiate_fn: instantiate_fn.clone(),
2536                deserialize_fn: deserialize_fn.clone(),
2537                metadata: metadata.clone(),
2538            },
2539            HydroNode::Counter {
2540                tag,
2541                duration,
2542                prefix,
2543                input,
2544                metadata,
2545            } => HydroNode::Counter {
2546                tag: tag.clone(),
2547                duration: duration.clone(),
2548                prefix: prefix.clone(),
2549                input: Box::new(input.deep_clone(seen_tees)),
2550                metadata: metadata.clone(),
2551            },
2552        }
2553    }
2554
2555    #[cfg(feature = "build")]
2556    pub fn emit_core(
2557        &mut self,
2558        builders_or_callback: &mut BuildersOrCallback<
2559            impl FnMut(&mut HydroRoot, &mut usize),
2560            impl FnMut(&mut HydroNode, &mut usize),
2561        >,
2562        seen_tees: &mut SeenSharedNodes,
2563        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2564        next_stmt_id: &mut usize,
2565    ) -> syn::Ident {
2566        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2567
2568        self.transform_bottom_up(
2569            &mut |node: &mut HydroNode| {
2570                let out_location = node.metadata().location_id.clone();
2571                match node {
2572                    HydroNode::Placeholder => {
2573                        panic!()
2574                    }
2575
2576                    HydroNode::Cast { .. } => {
2577                        // Cast passes through the input ident unchanged
2578                        // The input ident is already on the stack from processing the child
2579                        match builders_or_callback {
2580                            BuildersOrCallback::Builders(_) => {}
2581                            BuildersOrCallback::Callback(_, node_callback) => {
2582                                node_callback(node, next_stmt_id);
2583                            }
2584                        }
2585
2586                        *next_stmt_id += 1;
2587                        // input_ident stays on stack as output
2588                    }
2589
2590                    HydroNode::ObserveNonDet {
2591                        inner,
2592                        trusted,
2593                        metadata,
2594                        ..
2595                    } => {
2596                        let inner_ident = ident_stack.pop().unwrap();
2597
2598                        let observe_ident =
2599                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601                        match builders_or_callback {
2602                            BuildersOrCallback::Builders(graph_builders) => {
2603                                graph_builders.observe_nondet(
2604                                    *trusted,
2605                                    &inner.metadata().location_id,
2606                                    inner_ident,
2607                                    &inner.metadata().collection_kind,
2608                                    &observe_ident,
2609                                    &metadata.collection_kind,
2610                                    &metadata.op,
2611                                );
2612                            }
2613                            BuildersOrCallback::Callback(_, node_callback) => {
2614                                node_callback(node, next_stmt_id);
2615                            }
2616                        }
2617
2618                        *next_stmt_id += 1;
2619
2620                        ident_stack.push(observe_ident);
2621                    }
2622
2623                    HydroNode::Batch {
2624                        inner, metadata, ..
2625                    } => {
2626                        let inner_ident = ident_stack.pop().unwrap();
2627
2628                        let batch_ident =
2629                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2630
2631                        match builders_or_callback {
2632                            BuildersOrCallback::Builders(graph_builders) => {
2633                                graph_builders.batch(
2634                                    inner_ident,
2635                                    &inner.metadata().location_id,
2636                                    &inner.metadata().collection_kind,
2637                                    &batch_ident,
2638                                    &out_location,
2639                                    &metadata.op,
2640                                );
2641                            }
2642                            BuildersOrCallback::Callback(_, node_callback) => {
2643                                node_callback(node, next_stmt_id);
2644                            }
2645                        }
2646
2647                        *next_stmt_id += 1;
2648
2649                        ident_stack.push(batch_ident);
2650                    }
2651
2652                    HydroNode::YieldConcat { inner, .. } => {
2653                        let inner_ident = ident_stack.pop().unwrap();
2654
2655                        let yield_ident =
2656                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2657
2658                        match builders_or_callback {
2659                            BuildersOrCallback::Builders(graph_builders) => {
2660                                graph_builders.yield_from_tick(
2661                                    inner_ident,
2662                                    &inner.metadata().location_id,
2663                                    &inner.metadata().collection_kind,
2664                                    &yield_ident,
2665                                    &out_location,
2666                                );
2667                            }
2668                            BuildersOrCallback::Callback(_, node_callback) => {
2669                                node_callback(node, next_stmt_id);
2670                            }
2671                        }
2672
2673                        *next_stmt_id += 1;
2674
2675                        ident_stack.push(yield_ident);
2676                    }
2677
2678                    HydroNode::BeginAtomic { inner, metadata } => {
2679                        let inner_ident = ident_stack.pop().unwrap();
2680
2681                        let begin_ident =
2682                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2683
2684                        match builders_or_callback {
2685                            BuildersOrCallback::Builders(graph_builders) => {
2686                                graph_builders.begin_atomic(
2687                                    inner_ident,
2688                                    &inner.metadata().location_id,
2689                                    &inner.metadata().collection_kind,
2690                                    &begin_ident,
2691                                    &out_location,
2692                                    &metadata.op,
2693                                );
2694                            }
2695                            BuildersOrCallback::Callback(_, node_callback) => {
2696                                node_callback(node, next_stmt_id);
2697                            }
2698                        }
2699
2700                        *next_stmt_id += 1;
2701
2702                        ident_stack.push(begin_ident);
2703                    }
2704
2705                    HydroNode::EndAtomic { inner, .. } => {
2706                        let inner_ident = ident_stack.pop().unwrap();
2707
2708                        let end_ident =
2709                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2710
2711                        match builders_or_callback {
2712                            BuildersOrCallback::Builders(graph_builders) => {
2713                                graph_builders.end_atomic(
2714                                    inner_ident,
2715                                    &inner.metadata().location_id,
2716                                    &inner.metadata().collection_kind,
2717                                    &end_ident,
2718                                );
2719                            }
2720                            BuildersOrCallback::Callback(_, node_callback) => {
2721                                node_callback(node, next_stmt_id);
2722                            }
2723                        }
2724
2725                        *next_stmt_id += 1;
2726
2727                        ident_stack.push(end_ident);
2728                    }
2729
2730                    HydroNode::Source {
2731                        source, metadata, ..
2732                    } => {
2733                        if let HydroSource::ExternalNetwork() = source {
2734                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2735                        } else {
2736                            let source_ident =
2737                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2738
2739                            let source_stmt = match source {
2740                                HydroSource::Stream(expr) => {
2741                                    debug_assert!(metadata.location_id.is_top_level());
2742                                    parse_quote! {
2743                                        #source_ident = source_stream(#expr);
2744                                    }
2745                                }
2746
2747                                HydroSource::ExternalNetwork() => {
2748                                    unreachable!()
2749                                }
2750
2751                                HydroSource::Iter(expr) => {
2752                                    if metadata.location_id.is_top_level() {
2753                                        parse_quote! {
2754                                            #source_ident = source_iter(#expr);
2755                                        }
2756                                    } else {
2757                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2758                                        parse_quote! {
2759                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2760                                        }
2761                                    }
2762                                }
2763
2764                                HydroSource::Spin() => {
2765                                    debug_assert!(metadata.location_id.is_top_level());
2766                                    parse_quote! {
2767                                        #source_ident = spin();
2768                                    }
2769                                }
2770
2771                                HydroSource::ClusterMembers(target_loc, state) => {
2772                                    debug_assert!(metadata.location_id.is_top_level());
2773
2774                                    let members_tee_ident = syn::Ident::new(
2775                                        &format!(
2776                                            "__cluster_members_tee_{}_{}",
2777                                            metadata.location_id.root().key(),
2778                                            target_loc.key(),
2779                                        ),
2780                                        Span::call_site(),
2781                                    );
2782
2783                                    match state {
2784                                        ClusterMembersState::Stream(d) => {
2785                                            parse_quote! {
2786                                                #members_tee_ident = source_stream(#d) -> tee();
2787                                                #source_ident = #members_tee_ident;
2788                                            }
2789                                        },
2790                                        ClusterMembersState::Uninit => syn::parse_quote! {
2791                                            #source_ident = source_stream(DUMMY);
2792                                        },
2793                                        ClusterMembersState::Tee(..) => parse_quote! {
2794                                            #source_ident = #members_tee_ident;
2795                                        },
2796                                    }
2797                                }
2798
2799                                HydroSource::Embedded(ident) => {
2800                                    parse_quote! {
2801                                        #source_ident = source_stream(#ident);
2802                                    }
2803                                }
2804
2805                                HydroSource::EmbeddedSingleton(ident) => {
2806                                    parse_quote! {
2807                                        #source_ident = source_iter([#ident]);
2808                                    }
2809                                }
2810                            };
2811
2812                            match builders_or_callback {
2813                                BuildersOrCallback::Builders(graph_builders) => {
2814                                    let builder = graph_builders.get_dfir_mut(&out_location);
2815                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2816                                }
2817                                BuildersOrCallback::Callback(_, node_callback) => {
2818                                    node_callback(node, next_stmt_id);
2819                                }
2820                            }
2821
2822                            *next_stmt_id += 1;
2823
2824                            ident_stack.push(source_ident);
2825                        }
2826                    }
2827
2828                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2829                        let source_ident =
2830                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832                        match builders_or_callback {
2833                            BuildersOrCallback::Builders(graph_builders) => {
2834                                let builder = graph_builders.get_dfir_mut(&out_location);
2835
2836                                if *first_tick_only {
2837                                    assert!(
2838                                        !metadata.location_id.is_top_level(),
2839                                        "first_tick_only SingletonSource must be inside a tick"
2840                                    );
2841                                }
2842
2843                                if *first_tick_only
2844                                    || (metadata.location_id.is_top_level()
2845                                        && metadata.collection_kind.is_bounded())
2846                                {
2847                                    builder.add_dfir(
2848                                        parse_quote! {
2849                                            #source_ident = source_iter([#value]);
2850                                        },
2851                                        None,
2852                                        Some(&next_stmt_id.to_string()),
2853                                    );
2854                                } else {
2855                                    builder.add_dfir(
2856                                        parse_quote! {
2857                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2858                                        },
2859                                        None,
2860                                        Some(&next_stmt_id.to_string()),
2861                                    );
2862                                }
2863                            }
2864                            BuildersOrCallback::Callback(_, node_callback) => {
2865                                node_callback(node, next_stmt_id);
2866                            }
2867                        }
2868
2869                        *next_stmt_id += 1;
2870
2871                        ident_stack.push(source_ident);
2872                    }
2873
2874                    HydroNode::CycleSource { cycle_id, .. } => {
2875                        let ident = cycle_id.as_ident();
2876
2877                        match builders_or_callback {
2878                            BuildersOrCallback::Builders(_) => {}
2879                            BuildersOrCallback::Callback(_, node_callback) => {
2880                                node_callback(node, next_stmt_id);
2881                            }
2882                        }
2883
2884                        // consume a stmt id even though we did not emit anything so that we can instrument this
2885                        *next_stmt_id += 1;
2886
2887                        ident_stack.push(ident);
2888                    }
2889
2890                    HydroNode::Tee { inner, .. } => {
2891                        let ret_ident = if let Some(built_idents) =
2892                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2893                        {
2894                            match builders_or_callback {
2895                                BuildersOrCallback::Builders(_) => {}
2896                                BuildersOrCallback::Callback(_, node_callback) => {
2897                                    node_callback(node, next_stmt_id);
2898                                }
2899                            }
2900
2901                            built_idents[0].clone()
2902                        } else {
2903                            // The inner node was already processed by transform_bottom_up,
2904                            // so its ident is on the stack
2905                            let inner_ident = ident_stack.pop().unwrap();
2906
2907                            let tee_ident =
2908                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910                            built_tees.insert(
2911                                inner.0.as_ref() as *const RefCell<HydroNode>,
2912                                vec![tee_ident.clone()],
2913                            );
2914
2915                            match builders_or_callback {
2916                                BuildersOrCallback::Builders(graph_builders) => {
2917                                    let builder = graph_builders.get_dfir_mut(&out_location);
2918                                    builder.add_dfir(
2919                                        parse_quote! {
2920                                            #tee_ident = #inner_ident -> tee();
2921                                        },
2922                                        None,
2923                                        Some(&next_stmt_id.to_string()),
2924                                    );
2925                                }
2926                                BuildersOrCallback::Callback(_, node_callback) => {
2927                                    node_callback(node, next_stmt_id);
2928                                }
2929                            }
2930
2931                            tee_ident
2932                        };
2933
2934                        // we consume a stmt id regardless of if we emit the tee() operator,
2935                        // so that during rewrites we touch all recipients of the tee()
2936
2937                        *next_stmt_id += 1;
2938                        ident_stack.push(ret_ident);
2939                    }
2940
2941                    HydroNode::Partition {
2942                        inner, f, is_true, ..
2943                    } => {
2944                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
2945                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2946                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2947                            match builders_or_callback {
2948                                BuildersOrCallback::Builders(_) => {}
2949                                BuildersOrCallback::Callback(_, node_callback) => {
2950                                    node_callback(node, next_stmt_id);
2951                                }
2952                            }
2953
2954                            let idx = if is_true { 0 } else { 1 };
2955                            built_idents[idx].clone()
2956                        } else {
2957                            // The inner node was already processed by transform_bottom_up,
2958                            // so its ident is on the stack
2959                            let inner_ident = ident_stack.pop().unwrap();
2960
2961                            let partition_ident = syn::Ident::new(
2962                                &format!("stream_{}_partition", *next_stmt_id),
2963                                Span::call_site(),
2964                            );
2965                            let true_ident = syn::Ident::new(
2966                                &format!("stream_{}_true", *next_stmt_id),
2967                                Span::call_site(),
2968                            );
2969                            let false_ident = syn::Ident::new(
2970                                &format!("stream_{}_false", *next_stmt_id),
2971                                Span::call_site(),
2972                            );
2973
2974                            built_tees.insert(
2975                                ptr,
2976                                vec![true_ident.clone(), false_ident.clone()],
2977                            );
2978
2979                            match builders_or_callback {
2980                                BuildersOrCallback::Builders(graph_builders) => {
2981                                    let builder = graph_builders.get_dfir_mut(&out_location);
2982                                    builder.add_dfir(
2983                                        parse_quote! {
2984                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2985                                            #true_ident = #partition_ident[0];
2986                                            #false_ident = #partition_ident[1];
2987                                        },
2988                                        None,
2989                                        Some(&next_stmt_id.to_string()),
2990                                    );
2991                                }
2992                                BuildersOrCallback::Callback(_, node_callback) => {
2993                                    node_callback(node, next_stmt_id);
2994                                }
2995                            }
2996
2997                            if is_true { true_ident } else { false_ident }
2998                        };
2999
3000                        *next_stmt_id += 1;
3001                        ident_stack.push(ret_ident);
3002                    }
3003
3004                    HydroNode::Chain { .. } => {
3005                        // Children are processed left-to-right, so second is on top
3006                        let second_ident = ident_stack.pop().unwrap();
3007                        let first_ident = ident_stack.pop().unwrap();
3008
3009                        let chain_ident =
3010                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012                        match builders_or_callback {
3013                            BuildersOrCallback::Builders(graph_builders) => {
3014                                let builder = graph_builders.get_dfir_mut(&out_location);
3015                                builder.add_dfir(
3016                                    parse_quote! {
3017                                        #chain_ident = chain();
3018                                        #first_ident -> [0]#chain_ident;
3019                                        #second_ident -> [1]#chain_ident;
3020                                    },
3021                                    None,
3022                                    Some(&next_stmt_id.to_string()),
3023                                );
3024                            }
3025                            BuildersOrCallback::Callback(_, node_callback) => {
3026                                node_callback(node, next_stmt_id);
3027                            }
3028                        }
3029
3030                        *next_stmt_id += 1;
3031
3032                        ident_stack.push(chain_ident);
3033                    }
3034
3035                    HydroNode::ChainFirst { .. } => {
3036                        let second_ident = ident_stack.pop().unwrap();
3037                        let first_ident = ident_stack.pop().unwrap();
3038
3039                        let chain_ident =
3040                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3041
3042                        match builders_or_callback {
3043                            BuildersOrCallback::Builders(graph_builders) => {
3044                                let builder = graph_builders.get_dfir_mut(&out_location);
3045                                builder.add_dfir(
3046                                    parse_quote! {
3047                                        #chain_ident = chain_first_n(1);
3048                                        #first_ident -> [0]#chain_ident;
3049                                        #second_ident -> [1]#chain_ident;
3050                                    },
3051                                    None,
3052                                    Some(&next_stmt_id.to_string()),
3053                                );
3054                            }
3055                            BuildersOrCallback::Callback(_, node_callback) => {
3056                                node_callback(node, next_stmt_id);
3057                            }
3058                        }
3059
3060                        *next_stmt_id += 1;
3061
3062                        ident_stack.push(chain_ident);
3063                    }
3064
3065                    HydroNode::CrossSingleton { right, .. } => {
3066                        let right_ident = ident_stack.pop().unwrap();
3067                        let left_ident = ident_stack.pop().unwrap();
3068
3069                        let cross_ident =
3070                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3071
3072                        match builders_or_callback {
3073                            BuildersOrCallback::Builders(graph_builders) => {
3074                                let builder = graph_builders.get_dfir_mut(&out_location);
3075
3076                                if right.metadata().location_id.is_top_level()
3077                                    && right.metadata().collection_kind.is_bounded()
3078                                {
3079                                    builder.add_dfir(
3080                                        parse_quote! {
3081                                            #cross_ident = cross_singleton();
3082                                            #left_ident -> [input]#cross_ident;
3083                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3084                                        },
3085                                        None,
3086                                        Some(&next_stmt_id.to_string()),
3087                                    );
3088                                } else {
3089                                    builder.add_dfir(
3090                                        parse_quote! {
3091                                            #cross_ident = cross_singleton();
3092                                            #left_ident -> [input]#cross_ident;
3093                                            #right_ident -> [single]#cross_ident;
3094                                        },
3095                                        None,
3096                                        Some(&next_stmt_id.to_string()),
3097                                    );
3098                                }
3099                            }
3100                            BuildersOrCallback::Callback(_, node_callback) => {
3101                                node_callback(node, next_stmt_id);
3102                            }
3103                        }
3104
3105                        *next_stmt_id += 1;
3106
3107                        ident_stack.push(cross_ident);
3108                    }
3109
3110                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3111                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3112                            parse_quote!(cross_join_multiset)
3113                        } else {
3114                            parse_quote!(join_multiset)
3115                        };
3116
3117                        let (HydroNode::CrossProduct { left, right, .. }
3118                        | HydroNode::Join { left, right, .. }) = node
3119                        else {
3120                            unreachable!()
3121                        };
3122
3123                        let is_top_level = left.metadata().location_id.is_top_level()
3124                            && right.metadata().location_id.is_top_level();
3125                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3126                            quote!('static)
3127                        } else {
3128                            quote!('tick)
3129                        };
3130
3131                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3132                            quote!('static)
3133                        } else {
3134                            quote!('tick)
3135                        };
3136
3137                        let right_ident = ident_stack.pop().unwrap();
3138                        let left_ident = ident_stack.pop().unwrap();
3139
3140                        let stream_ident =
3141                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3142
3143                        match builders_or_callback {
3144                            BuildersOrCallback::Builders(graph_builders) => {
3145                                let builder = graph_builders.get_dfir_mut(&out_location);
3146                                builder.add_dfir(
3147                                    if is_top_level {
3148                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3149                                        // a multiset_delta() to negate the replay behavior
3150                                        parse_quote! {
3151                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3152                                            #left_ident -> [0]#stream_ident;
3153                                            #right_ident -> [1]#stream_ident;
3154                                        }
3155                                    } else {
3156                                        parse_quote! {
3157                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3158                                            #left_ident -> [0]#stream_ident;
3159                                            #right_ident -> [1]#stream_ident;
3160                                        }
3161                                    }
3162                                    ,
3163                                    None,
3164                                    Some(&next_stmt_id.to_string()),
3165                                );
3166                            }
3167                            BuildersOrCallback::Callback(_, node_callback) => {
3168                                node_callback(node, next_stmt_id);
3169                            }
3170                        }
3171
3172                        *next_stmt_id += 1;
3173
3174                        ident_stack.push(stream_ident);
3175                    }
3176
3177                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3178                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3179                            parse_quote!(difference)
3180                        } else {
3181                            parse_quote!(anti_join)
3182                        };
3183
3184                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3185                            node
3186                        else {
3187                            unreachable!()
3188                        };
3189
3190                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3191                            quote!('static)
3192                        } else {
3193                            quote!('tick)
3194                        };
3195
3196                        let neg_ident = ident_stack.pop().unwrap();
3197                        let pos_ident = ident_stack.pop().unwrap();
3198
3199                        let stream_ident =
3200                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3201
3202                        match builders_or_callback {
3203                            BuildersOrCallback::Builders(graph_builders) => {
3204                                let builder = graph_builders.get_dfir_mut(&out_location);
3205                                builder.add_dfir(
3206                                    parse_quote! {
3207                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3208                                        #pos_ident -> [pos]#stream_ident;
3209                                        #neg_ident -> [neg]#stream_ident;
3210                                    },
3211                                    None,
3212                                    Some(&next_stmt_id.to_string()),
3213                                );
3214                            }
3215                            BuildersOrCallback::Callback(_, node_callback) => {
3216                                node_callback(node, next_stmt_id);
3217                            }
3218                        }
3219
3220                        *next_stmt_id += 1;
3221
3222                        ident_stack.push(stream_ident);
3223                    }
3224
3225                    HydroNode::ResolveFutures { .. } => {
3226                        let input_ident = ident_stack.pop().unwrap();
3227
3228                        let futures_ident =
3229                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3230
3231                        match builders_or_callback {
3232                            BuildersOrCallback::Builders(graph_builders) => {
3233                                let builder = graph_builders.get_dfir_mut(&out_location);
3234                                builder.add_dfir(
3235                                    parse_quote! {
3236                                        #futures_ident = #input_ident -> resolve_futures();
3237                                    },
3238                                    None,
3239                                    Some(&next_stmt_id.to_string()),
3240                                );
3241                            }
3242                            BuildersOrCallback::Callback(_, node_callback) => {
3243                                node_callback(node, next_stmt_id);
3244                            }
3245                        }
3246
3247                        *next_stmt_id += 1;
3248
3249                        ident_stack.push(futures_ident);
3250                    }
3251
3252                    HydroNode::ResolveFuturesBlocking { .. } => {
3253                        let input_ident = ident_stack.pop().unwrap();
3254
3255                        let futures_ident =
3256                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3257
3258                        match builders_or_callback {
3259                            BuildersOrCallback::Builders(graph_builders) => {
3260                                let builder = graph_builders.get_dfir_mut(&out_location);
3261                                builder.add_dfir(
3262                                    parse_quote! {
3263                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3264                                    },
3265                                    None,
3266                                    Some(&next_stmt_id.to_string()),
3267                                );
3268                            }
3269                            BuildersOrCallback::Callback(_, node_callback) => {
3270                                node_callback(node, next_stmt_id);
3271                            }
3272                        }
3273
3274                        *next_stmt_id += 1;
3275
3276                        ident_stack.push(futures_ident);
3277                    }
3278
3279                    HydroNode::ResolveFuturesOrdered { .. } => {
3280                        let input_ident = ident_stack.pop().unwrap();
3281
3282                        let futures_ident =
3283                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3284
3285                        match builders_or_callback {
3286                            BuildersOrCallback::Builders(graph_builders) => {
3287                                let builder = graph_builders.get_dfir_mut(&out_location);
3288                                builder.add_dfir(
3289                                    parse_quote! {
3290                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3291                                    },
3292                                    None,
3293                                    Some(&next_stmt_id.to_string()),
3294                                );
3295                            }
3296                            BuildersOrCallback::Callback(_, node_callback) => {
3297                                node_callback(node, next_stmt_id);
3298                            }
3299                        }
3300
3301                        *next_stmt_id += 1;
3302
3303                        ident_stack.push(futures_ident);
3304                    }
3305
3306                    HydroNode::Map { f, .. } => {
3307                        let input_ident = ident_stack.pop().unwrap();
3308
3309                        let map_ident =
3310                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3311
3312                        match builders_or_callback {
3313                            BuildersOrCallback::Builders(graph_builders) => {
3314                                let builder = graph_builders.get_dfir_mut(&out_location);
3315                                builder.add_dfir(
3316                                    parse_quote! {
3317                                        #map_ident = #input_ident -> map(#f);
3318                                    },
3319                                    None,
3320                                    Some(&next_stmt_id.to_string()),
3321                                );
3322                            }
3323                            BuildersOrCallback::Callback(_, node_callback) => {
3324                                node_callback(node, next_stmt_id);
3325                            }
3326                        }
3327
3328                        *next_stmt_id += 1;
3329
3330                        ident_stack.push(map_ident);
3331                    }
3332
3333                    HydroNode::FlatMap { f, .. } => {
3334                        let input_ident = ident_stack.pop().unwrap();
3335
3336                        let flat_map_ident =
3337                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3338
3339                        match builders_or_callback {
3340                            BuildersOrCallback::Builders(graph_builders) => {
3341                                let builder = graph_builders.get_dfir_mut(&out_location);
3342                                builder.add_dfir(
3343                                    parse_quote! {
3344                                        #flat_map_ident = #input_ident -> flat_map(#f);
3345                                    },
3346                                    None,
3347                                    Some(&next_stmt_id.to_string()),
3348                                );
3349                            }
3350                            BuildersOrCallback::Callback(_, node_callback) => {
3351                                node_callback(node, next_stmt_id);
3352                            }
3353                        }
3354
3355                        *next_stmt_id += 1;
3356
3357                        ident_stack.push(flat_map_ident);
3358                    }
3359
3360                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3361                        let input_ident = ident_stack.pop().unwrap();
3362
3363                        let flat_map_stream_blocking_ident =
3364                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3365
3366                        match builders_or_callback {
3367                            BuildersOrCallback::Builders(graph_builders) => {
3368                                let builder = graph_builders.get_dfir_mut(&out_location);
3369                                builder.add_dfir(
3370                                    parse_quote! {
3371                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3372                                    },
3373                                    None,
3374                                    Some(&next_stmt_id.to_string()),
3375                                );
3376                            }
3377                            BuildersOrCallback::Callback(_, node_callback) => {
3378                                node_callback(node, next_stmt_id);
3379                            }
3380                        }
3381
3382                        *next_stmt_id += 1;
3383
3384                        ident_stack.push(flat_map_stream_blocking_ident);
3385                    }
3386
3387                    HydroNode::Filter { f, .. } => {
3388                        let input_ident = ident_stack.pop().unwrap();
3389
3390                        let filter_ident =
3391                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3392
3393                        match builders_or_callback {
3394                            BuildersOrCallback::Builders(graph_builders) => {
3395                                let builder = graph_builders.get_dfir_mut(&out_location);
3396                                builder.add_dfir(
3397                                    parse_quote! {
3398                                        #filter_ident = #input_ident -> filter(#f);
3399                                    },
3400                                    None,
3401                                    Some(&next_stmt_id.to_string()),
3402                                );
3403                            }
3404                            BuildersOrCallback::Callback(_, node_callback) => {
3405                                node_callback(node, next_stmt_id);
3406                            }
3407                        }
3408
3409                        *next_stmt_id += 1;
3410
3411                        ident_stack.push(filter_ident);
3412                    }
3413
3414                    HydroNode::FilterMap { f, .. } => {
3415                        let input_ident = ident_stack.pop().unwrap();
3416
3417                        let filter_map_ident =
3418                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3419
3420                        match builders_or_callback {
3421                            BuildersOrCallback::Builders(graph_builders) => {
3422                                let builder = graph_builders.get_dfir_mut(&out_location);
3423                                builder.add_dfir(
3424                                    parse_quote! {
3425                                        #filter_map_ident = #input_ident -> filter_map(#f);
3426                                    },
3427                                    None,
3428                                    Some(&next_stmt_id.to_string()),
3429                                );
3430                            }
3431                            BuildersOrCallback::Callback(_, node_callback) => {
3432                                node_callback(node, next_stmt_id);
3433                            }
3434                        }
3435
3436                        *next_stmt_id += 1;
3437
3438                        ident_stack.push(filter_map_ident);
3439                    }
3440
3441                    HydroNode::Sort { .. } => {
3442                        let input_ident = ident_stack.pop().unwrap();
3443
3444                        let sort_ident =
3445                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3446
3447                        match builders_or_callback {
3448                            BuildersOrCallback::Builders(graph_builders) => {
3449                                let builder = graph_builders.get_dfir_mut(&out_location);
3450                                builder.add_dfir(
3451                                    parse_quote! {
3452                                        #sort_ident = #input_ident -> sort();
3453                                    },
3454                                    None,
3455                                    Some(&next_stmt_id.to_string()),
3456                                );
3457                            }
3458                            BuildersOrCallback::Callback(_, node_callback) => {
3459                                node_callback(node, next_stmt_id);
3460                            }
3461                        }
3462
3463                        *next_stmt_id += 1;
3464
3465                        ident_stack.push(sort_ident);
3466                    }
3467
3468                    HydroNode::DeferTick { .. } => {
3469                        let input_ident = ident_stack.pop().unwrap();
3470
3471                        let defer_tick_ident =
3472                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3473
3474                        match builders_or_callback {
3475                            BuildersOrCallback::Builders(graph_builders) => {
3476                                let builder = graph_builders.get_dfir_mut(&out_location);
3477                                builder.add_dfir(
3478                                    parse_quote! {
3479                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3480                                    },
3481                                    None,
3482                                    Some(&next_stmt_id.to_string()),
3483                                );
3484                            }
3485                            BuildersOrCallback::Callback(_, node_callback) => {
3486                                node_callback(node, next_stmt_id);
3487                            }
3488                        }
3489
3490                        *next_stmt_id += 1;
3491
3492                        ident_stack.push(defer_tick_ident);
3493                    }
3494
3495                    HydroNode::Enumerate { input, .. } => {
3496                        let input_ident = ident_stack.pop().unwrap();
3497
3498                        let enumerate_ident =
3499                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3500
3501                        match builders_or_callback {
3502                            BuildersOrCallback::Builders(graph_builders) => {
3503                                let builder = graph_builders.get_dfir_mut(&out_location);
3504                                let lifetime = if input.metadata().location_id.is_top_level() {
3505                                    quote!('static)
3506                                } else {
3507                                    quote!('tick)
3508                                };
3509                                builder.add_dfir(
3510                                    parse_quote! {
3511                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3512                                    },
3513                                    None,
3514                                    Some(&next_stmt_id.to_string()),
3515                                );
3516                            }
3517                            BuildersOrCallback::Callback(_, node_callback) => {
3518                                node_callback(node, next_stmt_id);
3519                            }
3520                        }
3521
3522                        *next_stmt_id += 1;
3523
3524                        ident_stack.push(enumerate_ident);
3525                    }
3526
3527                    HydroNode::Inspect { f, .. } => {
3528                        let input_ident = ident_stack.pop().unwrap();
3529
3530                        let inspect_ident =
3531                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3532
3533                        match builders_or_callback {
3534                            BuildersOrCallback::Builders(graph_builders) => {
3535                                let builder = graph_builders.get_dfir_mut(&out_location);
3536                                builder.add_dfir(
3537                                    parse_quote! {
3538                                        #inspect_ident = #input_ident -> inspect(#f);
3539                                    },
3540                                    None,
3541                                    Some(&next_stmt_id.to_string()),
3542                                );
3543                            }
3544                            BuildersOrCallback::Callback(_, node_callback) => {
3545                                node_callback(node, next_stmt_id);
3546                            }
3547                        }
3548
3549                        *next_stmt_id += 1;
3550
3551                        ident_stack.push(inspect_ident);
3552                    }
3553
3554                    HydroNode::Unique { input, .. } => {
3555                        let input_ident = ident_stack.pop().unwrap();
3556
3557                        let unique_ident =
3558                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3559
3560                        match builders_or_callback {
3561                            BuildersOrCallback::Builders(graph_builders) => {
3562                                let builder = graph_builders.get_dfir_mut(&out_location);
3563                                let lifetime = if input.metadata().location_id.is_top_level() {
3564                                    quote!('static)
3565                                } else {
3566                                    quote!('tick)
3567                                };
3568
3569                                builder.add_dfir(
3570                                    parse_quote! {
3571                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3572                                    },
3573                                    None,
3574                                    Some(&next_stmt_id.to_string()),
3575                                );
3576                            }
3577                            BuildersOrCallback::Callback(_, node_callback) => {
3578                                node_callback(node, next_stmt_id);
3579                            }
3580                        }
3581
3582                        *next_stmt_id += 1;
3583
3584                        ident_stack.push(unique_ident);
3585                    }
3586
3587                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3588                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3589                            if input.metadata().location_id.is_top_level()
3590                                && input.metadata().collection_kind.is_bounded()
3591                            {
3592                                parse_quote!(fold_no_replay)
3593                            } else {
3594                                parse_quote!(fold)
3595                            }
3596                        } else if matches!(node, HydroNode::Scan { .. }) {
3597                            parse_quote!(scan)
3598                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3599                            parse_quote!(scan_async_blocking)
3600                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3601                            if input.metadata().location_id.is_top_level()
3602                                && input.metadata().collection_kind.is_bounded()
3603                            {
3604                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3605                            } else {
3606                                parse_quote!(fold_keyed)
3607                            }
3608                        } else {
3609                            unreachable!()
3610                        };
3611
3612                        let (HydroNode::Fold { input, .. }
3613                        | HydroNode::FoldKeyed { input, .. }
3614                        | HydroNode::Scan { input, .. }
3615                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
3616                        else {
3617                            unreachable!()
3618                        };
3619
3620                        let lifetime = if input.metadata().location_id.is_top_level() {
3621                            quote!('static)
3622                        } else {
3623                            quote!('tick)
3624                        };
3625
3626                        let input_ident = ident_stack.pop().unwrap();
3627
3628                        let (HydroNode::Fold { init, acc, .. }
3629                        | HydroNode::FoldKeyed { init, acc, .. }
3630                        | HydroNode::Scan { init, acc, .. }
3631                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3632                        else {
3633                            unreachable!()
3634                        };
3635
3636                        let fold_ident =
3637                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3638
3639                        match builders_or_callback {
3640                            BuildersOrCallback::Builders(graph_builders) => {
3641                                if matches!(node, HydroNode::Fold { .. })
3642                                    && node.metadata().location_id.is_top_level()
3643                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3644                                    && graph_builders.singleton_intermediates()
3645                                    && !node.metadata().collection_kind.is_bounded()
3646                                {
3647                                    let builder = graph_builders.get_dfir_mut(&out_location);
3648
3649                                    let acc: syn::Expr = parse_quote!({
3650                                        let mut __inner = #acc;
3651                                        move |__state, __value| {
3652                                            __inner(__state, __value);
3653                                            Some(__state.clone())
3654                                        }
3655                                    });
3656
3657                                    builder.add_dfir(
3658                                        parse_quote! {
3659                                            source_iter([(#init)()]) -> [0]#fold_ident;
3660                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3661                                            #fold_ident = chain();
3662                                        },
3663                                        None,
3664                                        Some(&next_stmt_id.to_string()),
3665                                    );
3666                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3667                                    && node.metadata().location_id.is_top_level()
3668                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3669                                    && graph_builders.singleton_intermediates()
3670                                    && !node.metadata().collection_kind.is_bounded()
3671                                {
3672                                    let builder = graph_builders.get_dfir_mut(&out_location);
3673
3674                                    let acc: syn::Expr = parse_quote!({
3675                                        let mut __init = #init;
3676                                        let mut __inner = #acc;
3677                                        move |__state, __kv: (_, _)| {
3678                                            // TODO(shadaj): we can avoid the clone when the entry exists
3679                                            let __state = __state
3680                                                .entry(::std::clone::Clone::clone(&__kv.0))
3681                                                .or_insert_with(|| (__init)());
3682                                            __inner(__state, __kv.1);
3683                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3684                                        }
3685                                    });
3686
3687                                    builder.add_dfir(
3688                                        parse_quote! {
3689                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3690                                        },
3691                                        None,
3692                                        Some(&next_stmt_id.to_string()),
3693                                    );
3694                                } else {
3695                                    let builder = graph_builders.get_dfir_mut(&out_location);
3696                                    builder.add_dfir(
3697                                        parse_quote! {
3698                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3699                                        },
3700                                        None,
3701                                        Some(&next_stmt_id.to_string()),
3702                                    );
3703                                }
3704                            }
3705                            BuildersOrCallback::Callback(_, node_callback) => {
3706                                node_callback(node, next_stmt_id);
3707                            }
3708                        }
3709
3710                        *next_stmt_id += 1;
3711
3712                        ident_stack.push(fold_ident);
3713                    }
3714
3715                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3716                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3717                            if input.metadata().location_id.is_top_level()
3718                                && input.metadata().collection_kind.is_bounded()
3719                            {
3720                                parse_quote!(reduce_no_replay)
3721                            } else {
3722                                parse_quote!(reduce)
3723                            }
3724                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3725                            if input.metadata().location_id.is_top_level()
3726                                && input.metadata().collection_kind.is_bounded()
3727                            {
3728                                todo!(
3729                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3730                                )
3731                            } else {
3732                                parse_quote!(reduce_keyed)
3733                            }
3734                        } else {
3735                            unreachable!()
3736                        };
3737
3738                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3739                        else {
3740                            unreachable!()
3741                        };
3742
3743                        let lifetime = if input.metadata().location_id.is_top_level() {
3744                            quote!('static)
3745                        } else {
3746                            quote!('tick)
3747                        };
3748
3749                        let input_ident = ident_stack.pop().unwrap();
3750
3751                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3752                        else {
3753                            unreachable!()
3754                        };
3755
3756                        let reduce_ident =
3757                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3758
3759                        match builders_or_callback {
3760                            BuildersOrCallback::Builders(graph_builders) => {
3761                                if matches!(node, HydroNode::Reduce { .. })
3762                                    && node.metadata().location_id.is_top_level()
3763                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3764                                    && graph_builders.singleton_intermediates()
3765                                    && !node.metadata().collection_kind.is_bounded()
3766                                {
3767                                    todo!(
3768                                        "Reduce with optional intermediates is not yet supported in simulator"
3769                                    );
3770                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3771                                    && node.metadata().location_id.is_top_level()
3772                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3773                                    && graph_builders.singleton_intermediates()
3774                                    && !node.metadata().collection_kind.is_bounded()
3775                                {
3776                                    todo!(
3777                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3778                                    );
3779                                } else {
3780                                    let builder = graph_builders.get_dfir_mut(&out_location);
3781                                    builder.add_dfir(
3782                                        parse_quote! {
3783                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3784                                        },
3785                                        None,
3786                                        Some(&next_stmt_id.to_string()),
3787                                    );
3788                                }
3789                            }
3790                            BuildersOrCallback::Callback(_, node_callback) => {
3791                                node_callback(node, next_stmt_id);
3792                            }
3793                        }
3794
3795                        *next_stmt_id += 1;
3796
3797                        ident_stack.push(reduce_ident);
3798                    }
3799
3800                    HydroNode::ReduceKeyedWatermark {
3801                        f,
3802                        input,
3803                        metadata,
3804                        ..
3805                    } => {
3806                        let lifetime = if input.metadata().location_id.is_top_level() {
3807                            quote!('static)
3808                        } else {
3809                            quote!('tick)
3810                        };
3811
3812                        // watermark is processed second, so it's on top
3813                        let watermark_ident = ident_stack.pop().unwrap();
3814                        let input_ident = ident_stack.pop().unwrap();
3815
3816                        let chain_ident = syn::Ident::new(
3817                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3818                            Span::call_site(),
3819                        );
3820
3821                        let fold_ident =
3822                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3823
3824                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3825                            && input.metadata().collection_kind.is_bounded()
3826                        {
3827                            parse_quote!(fold_no_replay)
3828                        } else {
3829                            parse_quote!(fold)
3830                        };
3831
3832                        match builders_or_callback {
3833                            BuildersOrCallback::Builders(graph_builders) => {
3834                                if metadata.location_id.is_top_level()
3835                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3836                                    && graph_builders.singleton_intermediates()
3837                                    && !metadata.collection_kind.is_bounded()
3838                                {
3839                                    todo!(
3840                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3841                                    )
3842                                } else {
3843                                    let builder = graph_builders.get_dfir_mut(&out_location);
3844                                    builder.add_dfir(
3845                                        parse_quote! {
3846                                            #chain_ident = chain();
3847                                            #input_ident
3848                                                -> map(|x| (Some(x), None))
3849                                                -> [0]#chain_ident;
3850                                            #watermark_ident
3851                                                -> map(|watermark| (None, Some(watermark)))
3852                                                -> [1]#chain_ident;
3853
3854                                            #fold_ident = #chain_ident
3855                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3856                                                    let __reduce_keyed_fn = #f;
3857                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3858                                                        if let Some((k, v)) = opt_payload {
3859                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3860                                                                if k < curr_watermark {
3861                                                                    return;
3862                                                                }
3863                                                            }
3864                                                            match map.entry(k) {
3865                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3866                                                                    e.insert(v);
3867                                                                }
3868                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3869                                                                    __reduce_keyed_fn(e.get_mut(), v);
3870                                                                }
3871                                                            }
3872                                                        } else {
3873                                                            let watermark = opt_watermark.unwrap();
3874                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3875                                                                if watermark <= curr_watermark {
3876                                                                    return;
3877                                                                }
3878                                                            }
3879                                                            *opt_curr_watermark = opt_watermark;
3880                                                            map.retain(|k, _| *k >= watermark);
3881                                                        }
3882                                                    }
3883                                                })
3884                                                -> flat_map(|(map, _curr_watermark)| map);
3885                                        },
3886                                        None,
3887                                        Some(&next_stmt_id.to_string()),
3888                                    );
3889                                }
3890                            }
3891                            BuildersOrCallback::Callback(_, node_callback) => {
3892                                node_callback(node, next_stmt_id);
3893                            }
3894                        }
3895
3896                        *next_stmt_id += 1;
3897
3898                        ident_stack.push(fold_ident);
3899                    }
3900
3901                    HydroNode::Network {
3902                        networking_info,
3903                        serialize_fn: serialize_pipeline,
3904                        instantiate_fn,
3905                        deserialize_fn: deserialize_pipeline,
3906                        input,
3907                        ..
3908                    } => {
3909                        let input_ident = ident_stack.pop().unwrap();
3910
3911                        let receiver_stream_ident =
3912                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3913
3914                        match builders_or_callback {
3915                            BuildersOrCallback::Builders(graph_builders) => {
3916                                let (sink_expr, source_expr) = match instantiate_fn {
3917                                    DebugInstantiate::Building => (
3918                                        syn::parse_quote!(DUMMY_SINK),
3919                                        syn::parse_quote!(DUMMY_SOURCE),
3920                                    ),
3921
3922                                    DebugInstantiate::Finalized(finalized) => {
3923                                        (finalized.sink.clone(), finalized.source.clone())
3924                                    }
3925                                };
3926
3927                                graph_builders.create_network(
3928                                    &input.metadata().location_id,
3929                                    &out_location,
3930                                    input_ident,
3931                                    &receiver_stream_ident,
3932                                    serialize_pipeline.as_ref(),
3933                                    sink_expr,
3934                                    source_expr,
3935                                    deserialize_pipeline.as_ref(),
3936                                    *next_stmt_id,
3937                                    networking_info,
3938                                );
3939                            }
3940                            BuildersOrCallback::Callback(_, node_callback) => {
3941                                node_callback(node, next_stmt_id);
3942                            }
3943                        }
3944
3945                        *next_stmt_id += 1;
3946
3947                        ident_stack.push(receiver_stream_ident);
3948                    }
3949
3950                    HydroNode::ExternalInput {
3951                        instantiate_fn,
3952                        deserialize_fn: deserialize_pipeline,
3953                        ..
3954                    } => {
3955                        let receiver_stream_ident =
3956                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3957
3958                        match builders_or_callback {
3959                            BuildersOrCallback::Builders(graph_builders) => {
3960                                let (_, source_expr) = match instantiate_fn {
3961                                    DebugInstantiate::Building => (
3962                                        syn::parse_quote!(DUMMY_SINK),
3963                                        syn::parse_quote!(DUMMY_SOURCE),
3964                                    ),
3965
3966                                    DebugInstantiate::Finalized(finalized) => {
3967                                        (finalized.sink.clone(), finalized.source.clone())
3968                                    }
3969                                };
3970
3971                                graph_builders.create_external_source(
3972                                    &out_location,
3973                                    source_expr,
3974                                    &receiver_stream_ident,
3975                                    deserialize_pipeline.as_ref(),
3976                                    *next_stmt_id,
3977                                );
3978                            }
3979                            BuildersOrCallback::Callback(_, node_callback) => {
3980                                node_callback(node, next_stmt_id);
3981                            }
3982                        }
3983
3984                        *next_stmt_id += 1;
3985
3986                        ident_stack.push(receiver_stream_ident);
3987                    }
3988
3989                    HydroNode::Counter {
3990                        tag,
3991                        duration,
3992                        prefix,
3993                        ..
3994                    } => {
3995                        let input_ident = ident_stack.pop().unwrap();
3996
3997                        let counter_ident =
3998                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3999
4000                        match builders_or_callback {
4001                            BuildersOrCallback::Builders(graph_builders) => {
4002                                let arg = format!("{}({})", prefix, tag);
4003                                let builder = graph_builders.get_dfir_mut(&out_location);
4004                                builder.add_dfir(
4005                                    parse_quote! {
4006                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4007                                    },
4008                                    None,
4009                                    Some(&next_stmt_id.to_string()),
4010                                );
4011                            }
4012                            BuildersOrCallback::Callback(_, node_callback) => {
4013                                node_callback(node, next_stmt_id);
4014                            }
4015                        }
4016
4017                        *next_stmt_id += 1;
4018
4019                        ident_stack.push(counter_ident);
4020                    }
4021                }
4022            },
4023            seen_tees,
4024            false,
4025        );
4026
4027        ident_stack
4028            .pop()
4029            .expect("ident_stack should have exactly one element after traversal")
4030    }
4031
4032    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4033        match self {
4034            HydroNode::Placeholder => {
4035                panic!()
4036            }
4037            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4038            HydroNode::Source { source, .. } => match source {
4039                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4040                HydroSource::ExternalNetwork()
4041                | HydroSource::Spin()
4042                | HydroSource::ClusterMembers(_, _)
4043                | HydroSource::Embedded(_)
4044                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4045            },
4046            HydroNode::SingletonSource { value, .. } => {
4047                transform(value);
4048            }
4049            HydroNode::CycleSource { .. }
4050            | HydroNode::Tee { .. }
4051            | HydroNode::YieldConcat { .. }
4052            | HydroNode::BeginAtomic { .. }
4053            | HydroNode::EndAtomic { .. }
4054            | HydroNode::Batch { .. }
4055            | HydroNode::Chain { .. }
4056            | HydroNode::ChainFirst { .. }
4057            | HydroNode::CrossProduct { .. }
4058            | HydroNode::CrossSingleton { .. }
4059            | HydroNode::ResolveFutures { .. }
4060            | HydroNode::ResolveFuturesBlocking { .. }
4061            | HydroNode::ResolveFuturesOrdered { .. }
4062            | HydroNode::Join { .. }
4063            | HydroNode::Difference { .. }
4064            | HydroNode::AntiJoin { .. }
4065            | HydroNode::DeferTick { .. }
4066            | HydroNode::Enumerate { .. }
4067            | HydroNode::Unique { .. }
4068            | HydroNode::Sort { .. } => {}
4069            HydroNode::Map { f, .. }
4070            | HydroNode::FlatMap { f, .. }
4071            | HydroNode::FlatMapStreamBlocking { f, .. }
4072            | HydroNode::Filter { f, .. }
4073            | HydroNode::FilterMap { f, .. }
4074            | HydroNode::Inspect { f, .. }
4075            | HydroNode::Partition { f, .. }
4076            | HydroNode::Reduce { f, .. }
4077            | HydroNode::ReduceKeyed { f, .. }
4078            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4079                transform(f);
4080            }
4081            HydroNode::Fold { init, acc, .. }
4082            | HydroNode::Scan { init, acc, .. }
4083            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4084            | HydroNode::FoldKeyed { init, acc, .. } => {
4085                transform(init);
4086                transform(acc);
4087            }
4088            HydroNode::Network {
4089                serialize_fn,
4090                deserialize_fn,
4091                ..
4092            } => {
4093                if let Some(serialize_fn) = serialize_fn {
4094                    transform(serialize_fn);
4095                }
4096                if let Some(deserialize_fn) = deserialize_fn {
4097                    transform(deserialize_fn);
4098                }
4099            }
4100            HydroNode::ExternalInput { deserialize_fn, .. } => {
4101                if let Some(deserialize_fn) = deserialize_fn {
4102                    transform(deserialize_fn);
4103                }
4104            }
4105            HydroNode::Counter { duration, .. } => {
4106                transform(duration);
4107            }
4108        }
4109    }
4110
4111    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4112        &self.metadata().op
4113    }
4114
4115    pub fn metadata(&self) -> &HydroIrMetadata {
4116        match self {
4117            HydroNode::Placeholder => {
4118                panic!()
4119            }
4120            HydroNode::Cast { metadata, .. } => metadata,
4121            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4122            HydroNode::Source { metadata, .. } => metadata,
4123            HydroNode::SingletonSource { metadata, .. } => metadata,
4124            HydroNode::CycleSource { metadata, .. } => metadata,
4125            HydroNode::Tee { metadata, .. } => metadata,
4126            HydroNode::Partition { metadata, .. } => metadata,
4127            HydroNode::YieldConcat { metadata, .. } => metadata,
4128            HydroNode::BeginAtomic { metadata, .. } => metadata,
4129            HydroNode::EndAtomic { metadata, .. } => metadata,
4130            HydroNode::Batch { metadata, .. } => metadata,
4131            HydroNode::Chain { metadata, .. } => metadata,
4132            HydroNode::ChainFirst { metadata, .. } => metadata,
4133            HydroNode::CrossProduct { metadata, .. } => metadata,
4134            HydroNode::CrossSingleton { metadata, .. } => metadata,
4135            HydroNode::Join { metadata, .. } => metadata,
4136            HydroNode::Difference { metadata, .. } => metadata,
4137            HydroNode::AntiJoin { metadata, .. } => metadata,
4138            HydroNode::ResolveFutures { metadata, .. } => metadata,
4139            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4140            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4141            HydroNode::Map { metadata, .. } => metadata,
4142            HydroNode::FlatMap { metadata, .. } => metadata,
4143            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4144            HydroNode::Filter { metadata, .. } => metadata,
4145            HydroNode::FilterMap { metadata, .. } => metadata,
4146            HydroNode::DeferTick { metadata, .. } => metadata,
4147            HydroNode::Enumerate { metadata, .. } => metadata,
4148            HydroNode::Inspect { metadata, .. } => metadata,
4149            HydroNode::Unique { metadata, .. } => metadata,
4150            HydroNode::Sort { metadata, .. } => metadata,
4151            HydroNode::Scan { metadata, .. } => metadata,
4152            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4153            HydroNode::Fold { metadata, .. } => metadata,
4154            HydroNode::FoldKeyed { metadata, .. } => metadata,
4155            HydroNode::Reduce { metadata, .. } => metadata,
4156            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4157            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4158            HydroNode::ExternalInput { metadata, .. } => metadata,
4159            HydroNode::Network { metadata, .. } => metadata,
4160            HydroNode::Counter { metadata, .. } => metadata,
4161        }
4162    }
4163
4164    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4165        &mut self.metadata_mut().op
4166    }
4167
4168    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4169        match self {
4170            HydroNode::Placeholder => {
4171                panic!()
4172            }
4173            HydroNode::Cast { metadata, .. } => metadata,
4174            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4175            HydroNode::Source { metadata, .. } => metadata,
4176            HydroNode::SingletonSource { metadata, .. } => metadata,
4177            HydroNode::CycleSource { metadata, .. } => metadata,
4178            HydroNode::Tee { metadata, .. } => metadata,
4179            HydroNode::Partition { metadata, .. } => metadata,
4180            HydroNode::YieldConcat { metadata, .. } => metadata,
4181            HydroNode::BeginAtomic { metadata, .. } => metadata,
4182            HydroNode::EndAtomic { metadata, .. } => metadata,
4183            HydroNode::Batch { metadata, .. } => metadata,
4184            HydroNode::Chain { metadata, .. } => metadata,
4185            HydroNode::ChainFirst { metadata, .. } => metadata,
4186            HydroNode::CrossProduct { metadata, .. } => metadata,
4187            HydroNode::CrossSingleton { metadata, .. } => metadata,
4188            HydroNode::Join { metadata, .. } => metadata,
4189            HydroNode::Difference { metadata, .. } => metadata,
4190            HydroNode::AntiJoin { metadata, .. } => metadata,
4191            HydroNode::ResolveFutures { metadata, .. } => metadata,
4192            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4193            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4194            HydroNode::Map { metadata, .. } => metadata,
4195            HydroNode::FlatMap { metadata, .. } => metadata,
4196            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4197            HydroNode::Filter { metadata, .. } => metadata,
4198            HydroNode::FilterMap { metadata, .. } => metadata,
4199            HydroNode::DeferTick { metadata, .. } => metadata,
4200            HydroNode::Enumerate { metadata, .. } => metadata,
4201            HydroNode::Inspect { metadata, .. } => metadata,
4202            HydroNode::Unique { metadata, .. } => metadata,
4203            HydroNode::Sort { metadata, .. } => metadata,
4204            HydroNode::Scan { metadata, .. } => metadata,
4205            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4206            HydroNode::Fold { metadata, .. } => metadata,
4207            HydroNode::FoldKeyed { metadata, .. } => metadata,
4208            HydroNode::Reduce { metadata, .. } => metadata,
4209            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4210            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4211            HydroNode::ExternalInput { metadata, .. } => metadata,
4212            HydroNode::Network { metadata, .. } => metadata,
4213            HydroNode::Counter { metadata, .. } => metadata,
4214        }
4215    }
4216
4217    pub fn input(&self) -> Vec<&HydroNode> {
4218        match self {
4219            HydroNode::Placeholder => {
4220                panic!()
4221            }
4222            HydroNode::Source { .. }
4223            | HydroNode::SingletonSource { .. }
4224            | HydroNode::ExternalInput { .. }
4225            | HydroNode::CycleSource { .. }
4226            | HydroNode::Tee { .. }
4227            | HydroNode::Partition { .. } => {
4228                // Tee/Partition should find their input in separate special ways
4229                vec![]
4230            }
4231            HydroNode::Cast { inner, .. }
4232            | HydroNode::ObserveNonDet { inner, .. }
4233            | HydroNode::YieldConcat { inner, .. }
4234            | HydroNode::BeginAtomic { inner, .. }
4235            | HydroNode::EndAtomic { inner, .. }
4236            | HydroNode::Batch { inner, .. } => {
4237                vec![inner]
4238            }
4239            HydroNode::Chain { first, second, .. } => {
4240                vec![first, second]
4241            }
4242            HydroNode::ChainFirst { first, second, .. } => {
4243                vec![first, second]
4244            }
4245            HydroNode::CrossProduct { left, right, .. }
4246            | HydroNode::CrossSingleton { left, right, .. }
4247            | HydroNode::Join { left, right, .. } => {
4248                vec![left, right]
4249            }
4250            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4251                vec![pos, neg]
4252            }
4253            HydroNode::Map { input, .. }
4254            | HydroNode::FlatMap { input, .. }
4255            | HydroNode::FlatMapStreamBlocking { input, .. }
4256            | HydroNode::Filter { input, .. }
4257            | HydroNode::FilterMap { input, .. }
4258            | HydroNode::Sort { input, .. }
4259            | HydroNode::DeferTick { input, .. }
4260            | HydroNode::Enumerate { input, .. }
4261            | HydroNode::Inspect { input, .. }
4262            | HydroNode::Unique { input, .. }
4263            | HydroNode::Network { input, .. }
4264            | HydroNode::Counter { input, .. }
4265            | HydroNode::ResolveFutures { input, .. }
4266            | HydroNode::ResolveFuturesBlocking { input, .. }
4267            | HydroNode::ResolveFuturesOrdered { input, .. }
4268            | HydroNode::Fold { input, .. }
4269            | HydroNode::FoldKeyed { input, .. }
4270            | HydroNode::Reduce { input, .. }
4271            | HydroNode::ReduceKeyed { input, .. }
4272            | HydroNode::Scan { input, .. }
4273            | HydroNode::ScanAsyncBlocking { input, .. } => {
4274                vec![input]
4275            }
4276            HydroNode::ReduceKeyedWatermark {
4277                input, watermark, ..
4278            } => {
4279                vec![input, watermark]
4280            }
4281        }
4282    }
4283
4284    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4285        self.input()
4286            .iter()
4287            .map(|input_node| input_node.metadata())
4288            .collect()
4289    }
4290
4291    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4292    /// has other live references, meaning the upstream is already driven
4293    /// by another consumer and does not need a Null sink.
4294    pub fn is_shared_with_others(&self) -> bool {
4295        match self {
4296            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4297                Rc::strong_count(&inner.0) > 1
4298            }
4299            _ => false,
4300        }
4301    }
4302
4303    pub fn print_root(&self) -> String {
4304        match self {
4305            HydroNode::Placeholder => {
4306                panic!()
4307            }
4308            HydroNode::Cast { .. } => "Cast()".to_owned(),
4309            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4310            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4311            HydroNode::SingletonSource {
4312                value,
4313                first_tick_only,
4314                ..
4315            } => format!(
4316                "SingletonSource({:?}, first_tick_only={})",
4317                value, first_tick_only
4318            ),
4319            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4320            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4321            HydroNode::Partition { f, is_true, .. } => {
4322                format!("Partition({:?}, is_true={})", f, is_true)
4323            }
4324            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4325            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4326            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4327            HydroNode::Batch { .. } => "Batch()".to_owned(),
4328            HydroNode::Chain { first, second, .. } => {
4329                format!("Chain({}, {})", first.print_root(), second.print_root())
4330            }
4331            HydroNode::ChainFirst { first, second, .. } => {
4332                format!(
4333                    "ChainFirst({}, {})",
4334                    first.print_root(),
4335                    second.print_root()
4336                )
4337            }
4338            HydroNode::CrossProduct { left, right, .. } => {
4339                format!(
4340                    "CrossProduct({}, {})",
4341                    left.print_root(),
4342                    right.print_root()
4343                )
4344            }
4345            HydroNode::CrossSingleton { left, right, .. } => {
4346                format!(
4347                    "CrossSingleton({}, {})",
4348                    left.print_root(),
4349                    right.print_root()
4350                )
4351            }
4352            HydroNode::Join { left, right, .. } => {
4353                format!("Join({}, {})", left.print_root(), right.print_root())
4354            }
4355            HydroNode::Difference { pos, neg, .. } => {
4356                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4357            }
4358            HydroNode::AntiJoin { pos, neg, .. } => {
4359                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4360            }
4361            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4362            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4363            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4364            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4365            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4366            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4367            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4368            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4369            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4370            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4371            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4372            HydroNode::Unique { .. } => "Unique()".to_owned(),
4373            HydroNode::Sort { .. } => "Sort()".to_owned(),
4374            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4375            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4376            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4377                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4378            }
4379            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4380            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4381            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4382            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4383            HydroNode::Network { .. } => "Network()".to_owned(),
4384            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4385            HydroNode::Counter { tag, duration, .. } => {
4386                format!("Counter({:?}, {:?})", tag, duration)
4387            }
4388        }
4389    }
4390}
4391
4392#[cfg(feature = "build")]
4393fn instantiate_network<'a, D>(
4394    env: &mut D::InstantiateEnv,
4395    from_location: &LocationId,
4396    to_location: &LocationId,
4397    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4398    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4399    name: Option<&str>,
4400    networking_info: &crate::networking::NetworkingInfo,
4401) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4402where
4403    D: Deploy<'a>,
4404{
4405    let ((sink, source), connect_fn) = match (from_location, to_location) {
4406        (&LocationId::Process(from), &LocationId::Process(to)) => {
4407            let from_node = processes
4408                .get(from)
4409                .unwrap_or_else(|| {
4410                    panic!("A process used in the graph was not instantiated: {}", from)
4411                })
4412                .clone();
4413            let to_node = processes
4414                .get(to)
4415                .unwrap_or_else(|| {
4416                    panic!("A process used in the graph was not instantiated: {}", to)
4417                })
4418                .clone();
4419
4420            let sink_port = from_node.next_port();
4421            let source_port = to_node.next_port();
4422
4423            (
4424                D::o2o_sink_source(
4425                    env,
4426                    &from_node,
4427                    &sink_port,
4428                    &to_node,
4429                    &source_port,
4430                    name,
4431                    networking_info,
4432                ),
4433                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4434            )
4435        }
4436        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4437            let from_node = processes
4438                .get(from)
4439                .unwrap_or_else(|| {
4440                    panic!("A process used in the graph was not instantiated: {}", from)
4441                })
4442                .clone();
4443            let to_node = clusters
4444                .get(to)
4445                .unwrap_or_else(|| {
4446                    panic!("A cluster used in the graph was not instantiated: {}", to)
4447                })
4448                .clone();
4449
4450            let sink_port = from_node.next_port();
4451            let source_port = to_node.next_port();
4452
4453            (
4454                D::o2m_sink_source(
4455                    env,
4456                    &from_node,
4457                    &sink_port,
4458                    &to_node,
4459                    &source_port,
4460                    name,
4461                    networking_info,
4462                ),
4463                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4464            )
4465        }
4466        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4467            let from_node = clusters
4468                .get(from)
4469                .unwrap_or_else(|| {
4470                    panic!("A cluster used in the graph was not instantiated: {}", from)
4471                })
4472                .clone();
4473            let to_node = processes
4474                .get(to)
4475                .unwrap_or_else(|| {
4476                    panic!("A process used in the graph was not instantiated: {}", to)
4477                })
4478                .clone();
4479
4480            let sink_port = from_node.next_port();
4481            let source_port = to_node.next_port();
4482
4483            (
4484                D::m2o_sink_source(
4485                    env,
4486                    &from_node,
4487                    &sink_port,
4488                    &to_node,
4489                    &source_port,
4490                    name,
4491                    networking_info,
4492                ),
4493                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4494            )
4495        }
4496        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4497            let from_node = clusters
4498                .get(from)
4499                .unwrap_or_else(|| {
4500                    panic!("A cluster used in the graph was not instantiated: {}", from)
4501                })
4502                .clone();
4503            let to_node = clusters
4504                .get(to)
4505                .unwrap_or_else(|| {
4506                    panic!("A cluster used in the graph was not instantiated: {}", to)
4507                })
4508                .clone();
4509
4510            let sink_port = from_node.next_port();
4511            let source_port = to_node.next_port();
4512
4513            (
4514                D::m2m_sink_source(
4515                    env,
4516                    &from_node,
4517                    &sink_port,
4518                    &to_node,
4519                    &source_port,
4520                    name,
4521                    networking_info,
4522                ),
4523                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4524            )
4525        }
4526        (LocationId::Tick(_, _), _) => panic!(),
4527        (_, LocationId::Tick(_, _)) => panic!(),
4528        (LocationId::Atomic(_), _) => panic!(),
4529        (_, LocationId::Atomic(_)) => panic!(),
4530    };
4531    (sink, source, connect_fn)
4532}
4533
4534#[cfg(test)]
4535mod test {
4536    use std::mem::size_of;
4537
4538    use stageleft::{QuotedWithContext, q};
4539
4540    use super::*;
4541
4542    #[test]
4543    #[cfg_attr(
4544        not(feature = "build"),
4545        ignore = "expects inclusion of feature-gated fields"
4546    )]
4547    fn hydro_node_size() {
4548        assert_eq!(size_of::<HydroNode>(), 248);
4549    }
4550
4551    #[test]
4552    #[cfg_attr(
4553        not(feature = "build"),
4554        ignore = "expects inclusion of feature-gated fields"
4555    )]
4556    fn hydro_root_size() {
4557        assert_eq!(size_of::<HydroRoot>(), 136);
4558    }
4559
4560    #[test]
4561    fn test_simplify_q_macro_basic() {
4562        // Test basic non-q! expression
4563        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4564        let result = simplify_q_macro(simple_expr.clone());
4565        assert_eq!(result, simple_expr);
4566    }
4567
4568    #[test]
4569    fn test_simplify_q_macro_actual_stageleft_call() {
4570        // Test a simplified version of what a real stageleft call might look like
4571        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4572        let result = simplify_q_macro(stageleft_call);
4573        // This should be processed by our visitor and simplified to q!(...)
4574        // since we detect the stageleft::runtime_support::fn_* pattern
4575        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4576    }
4577
4578    #[test]
4579    fn test_closure_no_pipe_at_start() {
4580        // Test a closure that does not start with a pipe
4581        let stageleft_call = q!({
4582            let foo = 123;
4583            move |b: usize| b + foo
4584        })
4585        .splice_fn1_ctx(&());
4586        let result = simplify_q_macro(stageleft_call);
4587        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4588    }
4589}