hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::NetworkHint;
28use crate::location::dynamic::LocationId;
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33/// Wrapper that displays only the tokens of a parsed expr.
34///
35/// Boxes `syn::Type` which is ~240 bytes.
36#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40    fn from(expr: syn::Expr) -> Self {
41        Self(Box::new(expr))
42    }
43}
44
45impl Deref for DebugExpr {
46    type Target = syn::Expr;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl ToTokens for DebugExpr {
54    fn to_tokens(&self, tokens: &mut TokenStream) {
55        self.0.to_tokens(tokens);
56    }
57}
58
59impl Debug for DebugExpr {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0.to_token_stream())
62    }
63}
64
65impl Display for DebugExpr {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let original = self.0.as_ref().clone();
68        let simplified = simplify_q_macro(original);
69
70        // For now, just use quote formatting without trying to parse as a statement
71        // This avoids the syn::parse_quote! issues entirely
72        write!(f, "q!({})", quote::quote!(#simplified))
73    }
74}
75
76/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
77fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78    // Try to parse the token string as a syn::Expr
79    // Use a visitor to simplify q! macro expansions
80    let mut simplifier = QMacroSimplifier::new();
81    simplifier.visit_expr_mut(&mut expr);
82
83    // If we found and simplified a q! macro, return the simplified version
84    if let Some(simplified) = simplifier.simplified_result {
85        simplified
86    } else {
87        expr
88    }
89}
90
91/// AST visitor that simplifies q! macro expansions
92#[derive(Default)]
93pub struct QMacroSimplifier {
94    pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103impl VisitMut for QMacroSimplifier {
104    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105        // Check if we already found a result to avoid further processing
106        if self.simplified_result.is_some() {
107            return;
108        }
109
110        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111            // Look for calls to stageleft::runtime_support::fn*
112            && self.is_stageleft_runtime_support_call(&path_expr.path)
113            // Try to extract the closure from the arguments
114            && let Some(closure) = self.extract_closure_from_args(&call.args)
115        {
116            self.simplified_result = Some(closure);
117            return;
118        }
119
120        // Continue visiting child expressions using the default implementation
121        // Use the default visitor to avoid infinite recursion
122        syn::visit_mut::visit_expr_mut(self, expr);
123    }
124}
125
126impl QMacroSimplifier {
127    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128        // Check if this is a call to stageleft::runtime_support::fn*
129        if let Some(last_segment) = path.segments.last() {
130            let fn_name = last_segment.ident.to_string();
131            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
132            fn_name.contains("_type_hint")
133                && path.segments.len() > 2
134                && path.segments[0].ident == "stageleft"
135                && path.segments[1].ident == "runtime_support"
136        } else {
137            false
138        }
139    }
140
141    fn extract_closure_from_args(
142        &self,
143        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144    ) -> Option<syn::Expr> {
145        // Look through the arguments for a closure expression
146        for arg in args {
147            if let syn::Expr::Closure(_) = arg {
148                return Some(arg.clone());
149            }
150            // Also check for closures nested in other expressions (like blocks)
151            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152                return Some(closure_expr);
153            }
154        }
155        None
156    }
157
158    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159        let mut visitor = ClosureFinder {
160            found_closure: None,
161            prefer_inner_blocks: true,
162        };
163        visitor.visit_expr(expr);
164        visitor.found_closure
165    }
166}
167
168/// Visitor that finds closures in expressions with special block handling
169struct ClosureFinder {
170    found_closure: Option<syn::Expr>,
171    prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176        // If we already found a closure, don't continue searching
177        if self.found_closure.is_some() {
178            return;
179        }
180
181        match expr {
182            syn::Expr::Closure(_) => {
183                self.found_closure = Some(expr.clone());
184            }
185            syn::Expr::Block(block) if self.prefer_inner_blocks => {
186                // Special handling for blocks - look for inner blocks that contain closures
187                for stmt in &block.block.stmts {
188                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
189                        && let syn::Expr::Block(_) = stmt_expr
190                    {
191                        // Check if this nested block contains a closure
192                        let mut inner_visitor = ClosureFinder {
193                            found_closure: None,
194                            prefer_inner_blocks: false, // Avoid infinite recursion
195                        };
196                        inner_visitor.visit_expr(stmt_expr);
197                        if inner_visitor.found_closure.is_some() {
198                            // Found a closure in an inner block, return that block
199                            self.found_closure = Some(stmt_expr.clone());
200                            return;
201                        }
202                    }
203                }
204
205                // If no inner block with closure found, continue with normal visitation
206                visit::visit_expr(self, expr);
207
208                // If we found a closure, just return the closure itself, not the whole block
209                // unless we're in the special case where we want the containing block
210                if self.found_closure.is_some() {
211                    // The closure was found during visitation, no need to wrap in block
212                }
213            }
214            _ => {
215                // Use default visitor behavior for all other expressions
216                visit::visit_expr(self, expr);
217            }
218        }
219    }
220}
221
222/// Debug displays the type's tokens.
223///
224/// Boxes `syn::Type` which is ~320 bytes.
225#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229    fn from(t: syn::Type) -> Self {
230        Self(Box::new(t))
231    }
232}
233
234impl Deref for DebugType {
235    type Target = syn::Type;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl ToTokens for DebugType {
243    fn to_tokens(&self, tokens: &mut TokenStream) {
244        self.0.to_tokens(tokens);
245    }
246}
247
248impl Debug for DebugType {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "{}", self.0.to_token_stream())
251    }
252}
253
254pub enum DebugInstantiate {
255    Building,
256    Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260    not(feature = "build"),
261    expect(
262        dead_code,
263        reason = "sink, source unused without `feature = \"build\"`."
264    )
265)]
266pub struct DebugInstantiateFinalized {
267    sink: syn::Expr,
268    source: syn::Expr,
269    connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273    fn from(f: DebugInstantiateFinalized) -> Self {
274        Self::Finalized(Box::new(f))
275    }
276}
277
278impl Debug for DebugInstantiate {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "<network instantiate>")
281    }
282}
283
284impl Hash for DebugInstantiate {
285    fn hash<H: Hasher>(&self, _state: &mut H) {
286        // Do nothing
287    }
288}
289
290impl Clone for DebugInstantiate {
291    fn clone(&self) -> Self {
292        match self {
293            DebugInstantiate::Building => DebugInstantiate::Building,
294            DebugInstantiate::Finalized(_) => {
295                panic!("DebugInstantiate::Finalized should not be cloned")
296            }
297        }
298    }
299}
300
301/// A source in a Hydro graph, where data enters the graph.
302#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304    Stream(DebugExpr),
305    ExternalNetwork(),
306    Iter(DebugExpr),
307    Spin(),
308    ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
313/// and simulations.
314///
315/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
316/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
317pub trait DfirBuilder {
318    /// Whether the representation of singletons should include intermediate states.
319    fn singleton_intermediates(&self) -> bool;
320
321    /// Gets the DFIR builder for the given location, creating it if necessary.
322    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324    fn batch(
325        &mut self,
326        in_ident: syn::Ident,
327        in_location: &LocationId,
328        in_kind: &CollectionKind,
329        out_ident: &syn::Ident,
330        out_location: &LocationId,
331        op_meta: &HydroIrOpMetadata,
332    );
333    fn yield_from_tick(
334        &mut self,
335        in_ident: syn::Ident,
336        in_location: &LocationId,
337        in_kind: &CollectionKind,
338        out_ident: &syn::Ident,
339        out_location: &LocationId,
340    );
341
342    fn begin_atomic(
343        &mut self,
344        in_ident: syn::Ident,
345        in_location: &LocationId,
346        in_kind: &CollectionKind,
347        out_ident: &syn::Ident,
348        out_location: &LocationId,
349        op_meta: &HydroIrOpMetadata,
350    );
351    fn end_atomic(
352        &mut self,
353        in_ident: syn::Ident,
354        in_location: &LocationId,
355        in_kind: &CollectionKind,
356        out_ident: &syn::Ident,
357    );
358
359    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360    fn observe_nondet(
361        &mut self,
362        trusted: bool,
363        location: &LocationId,
364        in_ident: syn::Ident,
365        in_kind: &CollectionKind,
366        out_ident: &syn::Ident,
367        out_kind: &CollectionKind,
368        op_meta: &HydroIrOpMetadata,
369    );
370
371    #[expect(clippy::too_many_arguments, reason = "TODO")]
372    fn create_network(
373        &mut self,
374        from: &LocationId,
375        to: &LocationId,
376        input_ident: syn::Ident,
377        out_ident: &syn::Ident,
378        serialize: &Option<DebugExpr>,
379        sink: syn::Expr,
380        source: syn::Expr,
381        deserialize: &Option<DebugExpr>,
382        tag_id: usize,
383    );
384
385    fn create_external_source(
386        &mut self,
387        on: &LocationId,
388        source_expr: syn::Expr,
389        out_ident: &syn::Ident,
390        deserialize: &Option<DebugExpr>,
391        tag_id: usize,
392    );
393
394    fn create_external_output(
395        &mut self,
396        on: &LocationId,
397        sink_expr: syn::Expr,
398        input_ident: &syn::Ident,
399        serialize: &Option<DebugExpr>,
400        tag_id: usize,
401    );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
406    fn singleton_intermediates(&self) -> bool {
407        false
408    }
409
410    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411        self.entry(location.root().raw_id()).or_default()
412    }
413
414    fn batch(
415        &mut self,
416        in_ident: syn::Ident,
417        in_location: &LocationId,
418        in_kind: &CollectionKind,
419        out_ident: &syn::Ident,
420        _out_location: &LocationId,
421        _op_meta: &HydroIrOpMetadata,
422    ) {
423        let builder = self.get_dfir_mut(in_location.root());
424        if in_kind.is_bounded()
425            && matches!(
426                in_kind,
427                CollectionKind::Singleton { .. }
428                    | CollectionKind::Optional { .. }
429                    | CollectionKind::KeyedSingleton { .. }
430            )
431        {
432            assert!(in_location.is_top_level());
433            builder.add_dfir(
434                parse_quote! {
435                    #out_ident = #in_ident -> persist::<'static>();
436                },
437                None,
438                None,
439            );
440        } else {
441            builder.add_dfir(
442                parse_quote! {
443                    #out_ident = #in_ident;
444                },
445                None,
446                None,
447            );
448        }
449    }
450
451    fn yield_from_tick(
452        &mut self,
453        in_ident: syn::Ident,
454        in_location: &LocationId,
455        _in_kind: &CollectionKind,
456        out_ident: &syn::Ident,
457        _out_location: &LocationId,
458    ) {
459        let builder = self.get_dfir_mut(in_location.root());
460        builder.add_dfir(
461            parse_quote! {
462                #out_ident = #in_ident;
463            },
464            None,
465            None,
466        );
467    }
468
469    fn begin_atomic(
470        &mut self,
471        in_ident: syn::Ident,
472        in_location: &LocationId,
473        _in_kind: &CollectionKind,
474        out_ident: &syn::Ident,
475        _out_location: &LocationId,
476        _op_meta: &HydroIrOpMetadata,
477    ) {
478        let builder = self.get_dfir_mut(in_location.root());
479        builder.add_dfir(
480            parse_quote! {
481                #out_ident = #in_ident;
482            },
483            None,
484            None,
485        );
486    }
487
488    fn end_atomic(
489        &mut self,
490        in_ident: syn::Ident,
491        in_location: &LocationId,
492        _in_kind: &CollectionKind,
493        out_ident: &syn::Ident,
494    ) {
495        let builder = self.get_dfir_mut(in_location.root());
496        builder.add_dfir(
497            parse_quote! {
498                #out_ident = #in_ident;
499            },
500            None,
501            None,
502        );
503    }
504
505    fn observe_nondet(
506        &mut self,
507        _trusted: bool,
508        location: &LocationId,
509        in_ident: syn::Ident,
510        _in_kind: &CollectionKind,
511        out_ident: &syn::Ident,
512        _out_kind: &CollectionKind,
513        _op_meta: &HydroIrOpMetadata,
514    ) {
515        let builder = self.get_dfir_mut(location);
516        builder.add_dfir(
517            parse_quote! {
518                #out_ident = #in_ident;
519            },
520            None,
521            None,
522        );
523    }
524
525    fn create_network(
526        &mut self,
527        from: &LocationId,
528        to: &LocationId,
529        input_ident: syn::Ident,
530        out_ident: &syn::Ident,
531        serialize: &Option<DebugExpr>,
532        sink: syn::Expr,
533        source: syn::Expr,
534        deserialize: &Option<DebugExpr>,
535        tag_id: usize,
536    ) {
537        let sender_builder = self.get_dfir_mut(from);
538        if let Some(serialize_pipeline) = serialize {
539            sender_builder.add_dfir(
540                parse_quote! {
541                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
542                },
543                None,
544                // operator tag separates send and receive, which otherwise have the same next_stmt_id
545                Some(&format!("send{}", tag_id)),
546            );
547        } else {
548            sender_builder.add_dfir(
549                parse_quote! {
550                    #input_ident -> dest_sink(#sink);
551                },
552                None,
553                Some(&format!("send{}", tag_id)),
554            );
555        }
556
557        let receiver_builder = self.get_dfir_mut(to);
558        if let Some(deserialize_pipeline) = deserialize {
559            receiver_builder.add_dfir(
560                parse_quote! {
561                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
562                },
563                None,
564                Some(&format!("recv{}", tag_id)),
565            );
566        } else {
567            receiver_builder.add_dfir(
568                parse_quote! {
569                    #out_ident = source_stream(#source);
570                },
571                None,
572                Some(&format!("recv{}", tag_id)),
573            );
574        }
575    }
576
577    fn create_external_source(
578        &mut self,
579        on: &LocationId,
580        source_expr: syn::Expr,
581        out_ident: &syn::Ident,
582        deserialize: &Option<DebugExpr>,
583        tag_id: usize,
584    ) {
585        let receiver_builder = self.get_dfir_mut(on);
586        if let Some(deserialize_pipeline) = deserialize {
587            receiver_builder.add_dfir(
588                parse_quote! {
589                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
590                },
591                None,
592                Some(&format!("recv{}", tag_id)),
593            );
594        } else {
595            receiver_builder.add_dfir(
596                parse_quote! {
597                    #out_ident = source_stream(#source_expr);
598                },
599                None,
600                Some(&format!("recv{}", tag_id)),
601            );
602        }
603    }
604
605    fn create_external_output(
606        &mut self,
607        on: &LocationId,
608        sink_expr: syn::Expr,
609        input_ident: &syn::Ident,
610        serialize: &Option<DebugExpr>,
611        tag_id: usize,
612    ) {
613        let sender_builder = self.get_dfir_mut(on);
614        if let Some(serialize_fn) = serialize {
615            sender_builder.add_dfir(
616                parse_quote! {
617                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
618                },
619                None,
620                // operator tag separates send and receive, which otherwise have the same next_stmt_id
621                Some(&format!("send{}", tag_id)),
622            );
623        } else {
624            sender_builder.add_dfir(
625                parse_quote! {
626                    #input_ident -> dest_sink(#sink_expr);
627                },
628                None,
629                Some(&format!("send{}", tag_id)),
630            );
631        }
632    }
633}
634
635#[cfg(feature = "build")]
636pub enum BuildersOrCallback<'a, L, N>
637where
638    L: FnMut(&mut HydroRoot, &mut usize),
639    N: FnMut(&mut HydroNode, &mut usize),
640{
641    Builders(&'a mut dyn DfirBuilder),
642    Callback(L, N),
643}
644
645/// An root in a Hydro graph, which is an pipeline that doesn't emit
646/// any downstream values. Traversals over the dataflow graph and
647/// generating DFIR IR start from roots.
648#[derive(Debug, Hash)]
649pub enum HydroRoot {
650    ForEach {
651        f: DebugExpr,
652        input: Box<HydroNode>,
653        op_metadata: HydroIrOpMetadata,
654    },
655    SendExternal {
656        to_external_id: usize,
657        to_port_id: ExternalPortId,
658        to_many: bool,
659        unpaired: bool,
660        serialize_fn: Option<DebugExpr>,
661        instantiate_fn: DebugInstantiate,
662        input: Box<HydroNode>,
663        op_metadata: HydroIrOpMetadata,
664    },
665    DestSink {
666        sink: DebugExpr,
667        input: Box<HydroNode>,
668        op_metadata: HydroIrOpMetadata,
669    },
670    CycleSink {
671        ident: syn::Ident,
672        input: Box<HydroNode>,
673        op_metadata: HydroIrOpMetadata,
674    },
675}
676
677impl HydroRoot {
678    #[cfg(feature = "build")]
679    pub fn compile_network<'a, D>(
680        &mut self,
681        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
682        seen_tees: &mut SeenTees,
683        processes: &HashMap<usize, D::Process>,
684        clusters: &HashMap<usize, D::Cluster>,
685        externals: &HashMap<usize, D::External>,
686    ) where
687        D: Deploy<'a>,
688    {
689        let refcell_extra_stmts = RefCell::new(extra_stmts);
690        self.transform_bottom_up(
691            &mut |l| {
692                if let HydroRoot::SendExternal {
693                    input,
694                    to_external_id,
695                    to_port_id,
696                    to_many,
697                    unpaired,
698                    instantiate_fn,
699                    ..
700                } = l
701                {
702                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
703                        DebugInstantiate::Building => {
704                            let to_node = externals
705                                .get(to_external_id)
706                                .unwrap_or_else(|| {
707                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
708                                })
709                                .clone();
710
711                            match input.metadata().location_kind.root() {
712                                LocationId::Process(process_id) => {
713                                    if *to_many {
714                                        (
715                                            (
716                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_port_id)),
717                                                parse_quote!(DUMMY),
718                                            ),
719                                            Box::new(|| {}) as Box<dyn FnOnce()>,
720                                        )
721                                    } else {
722                                        let from_node = processes
723                                            .get(process_id)
724                                            .unwrap_or_else(|| {
725                                                panic!("A process used in the graph was not instantiated: {}", process_id)
726                                            })
727                                            .clone();
728
729                                        let sink_port = from_node.next_port();
730                                        let source_port = to_node.next_port();
731
732                                        if *unpaired {
733                                            use stageleft::quote_type;
734                                            use tokio_util::codec::LengthDelimitedCodec;
735
736                                            to_node.register(*to_port_id, source_port.clone());
737
738                                            let _ = D::e2o_source(
739                                                refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
740                                                &to_node, &source_port,
741                                                &from_node, &sink_port,
742                                                &quote_type::<LengthDelimitedCodec>(),
743                                                format!("{}_{}", *to_external_id, *to_port_id)
744                                            );
745                                        }
746
747                                        (
748                                            (
749                                                D::o2e_sink(
750                                                    &from_node,
751                                                    &sink_port,
752                                                    &to_node,
753                                                    &source_port,
754                                                    format!("{}_{}", *to_external_id, *to_port_id)
755                                                ),
756                                                parse_quote!(DUMMY),
757                                            ),
758                                            if *unpaired {
759                                                D::e2o_connect(
760                                                    &to_node,
761                                                    &source_port,
762                                                    &from_node,
763                                                    &sink_port,
764                                                    *to_many,
765                                                    NetworkHint::Auto,
766                                                )
767                                            } else {
768                                                Box::new(|| {}) as Box<dyn FnOnce()>
769                                            },
770                                        )
771                                    }
772                                }
773                                LocationId::Cluster(_) => todo!(),
774                                _ => panic!()
775                            }
776                        },
777
778                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
779                    };
780
781                    *instantiate_fn = DebugInstantiateFinalized {
782                        sink: sink_expr,
783                        source: source_expr,
784                        connect_fn: Some(connect_fn),
785                    }
786                    .into();
787                }
788            },
789            &mut |n| {
790                if let HydroNode::Network {
791                    input,
792                    instantiate_fn,
793                    metadata,
794                    ..
795                } = n
796                {
797                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
798                        DebugInstantiate::Building => instantiate_network::<D>(
799                            input.metadata().location_kind.root(),
800                            metadata.location_kind.root(),
801                            processes,
802                            clusters,
803                        ),
804
805                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
806                    };
807
808                    *instantiate_fn = DebugInstantiateFinalized {
809                        sink: sink_expr,
810                        source: source_expr,
811                        connect_fn: Some(connect_fn),
812                    }
813                    .into();
814                } else if let HydroNode::ExternalInput {
815                    from_external_id,
816                    from_port_id,
817                    from_many,
818                    codec_type,
819                    port_hint,
820                    instantiate_fn,
821                    metadata,
822                    ..
823                } = n
824                {
825                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
826                        DebugInstantiate::Building => {
827                            let from_node = externals
828                                .get(from_external_id)
829                                .unwrap_or_else(|| {
830                                    panic!(
831                                        "A external used in the graph was not instantiated: {}",
832                                        from_external_id
833                                    )
834                                })
835                                .clone();
836
837                            match metadata.location_kind.root() {
838                                LocationId::Process(process_id) => {
839                                    let to_node = processes
840                                        .get(process_id)
841                                        .unwrap_or_else(|| {
842                                            panic!("A process used in the graph was not instantiated: {}", process_id)
843                                        })
844                                        .clone();
845
846                                    let sink_port = from_node.next_port();
847                                    let source_port = to_node.next_port();
848
849                                    from_node.register(*from_port_id, sink_port.clone());
850
851                                    (
852                                        (
853                                            parse_quote!(DUMMY),
854                                            if *from_many {
855                                                D::e2o_many_source(
856                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
857                                                    &to_node, &source_port,
858                                                    codec_type.0.as_ref(),
859                                                    format!("{}_{}", *from_external_id, *from_port_id)
860                                                )
861                                            } else {
862                                                D::e2o_source(
863                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
864                                                    &from_node, &sink_port,
865                                                    &to_node, &source_port,
866                                                    codec_type.0.as_ref(),
867                                                    format!("{}_{}", *from_external_id, *from_port_id)
868                                                )
869                                            },
870                                        ),
871                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
872                                    )
873                                }
874                                LocationId::Cluster(_) => todo!(),
875                                _ => panic!()
876                            }
877                        },
878
879                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
880                    };
881
882                    *instantiate_fn = DebugInstantiateFinalized {
883                        sink: sink_expr,
884                        source: source_expr,
885                        connect_fn: Some(connect_fn),
886                    }
887                    .into();
888                }
889            },
890            seen_tees,
891            false,
892        );
893    }
894
895    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
896        self.transform_bottom_up(
897            &mut |l| {
898                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
899                    match instantiate_fn {
900                        DebugInstantiate::Building => panic!("network not built"),
901
902                        DebugInstantiate::Finalized(finalized) => {
903                            (finalized.connect_fn.take().unwrap())();
904                        }
905                    }
906                }
907            },
908            &mut |n| {
909                if let HydroNode::Network { instantiate_fn, .. }
910                | HydroNode::ExternalInput { instantiate_fn, .. } = n
911                {
912                    match instantiate_fn {
913                        DebugInstantiate::Building => panic!("network not built"),
914
915                        DebugInstantiate::Finalized(finalized) => {
916                            (finalized.connect_fn.take().unwrap())();
917                        }
918                    }
919                }
920            },
921            seen_tees,
922            false,
923        );
924    }
925
926    pub fn transform_bottom_up(
927        &mut self,
928        transform_root: &mut impl FnMut(&mut HydroRoot),
929        transform_node: &mut impl FnMut(&mut HydroNode),
930        seen_tees: &mut SeenTees,
931        check_well_formed: bool,
932    ) {
933        self.transform_children(
934            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
935            seen_tees,
936        );
937
938        transform_root(self);
939    }
940
941    pub fn transform_children(
942        &mut self,
943        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
944        seen_tees: &mut SeenTees,
945    ) {
946        match self {
947            HydroRoot::ForEach { input, .. }
948            | HydroRoot::SendExternal { input, .. }
949            | HydroRoot::DestSink { input, .. }
950            | HydroRoot::CycleSink { input, .. } => {
951                transform(input, seen_tees);
952            }
953        }
954    }
955
956    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
957        match self {
958            HydroRoot::ForEach {
959                f,
960                input,
961                op_metadata,
962            } => HydroRoot::ForEach {
963                f: f.clone(),
964                input: Box::new(input.deep_clone(seen_tees)),
965                op_metadata: op_metadata.clone(),
966            },
967            HydroRoot::SendExternal {
968                to_external_id,
969                to_port_id,
970                to_many,
971                unpaired,
972                serialize_fn,
973                instantiate_fn,
974                input,
975                op_metadata,
976            } => HydroRoot::SendExternal {
977                to_external_id: *to_external_id,
978                to_port_id: *to_port_id,
979                to_many: *to_many,
980                unpaired: *unpaired,
981                serialize_fn: serialize_fn.clone(),
982                instantiate_fn: instantiate_fn.clone(),
983                input: Box::new(input.deep_clone(seen_tees)),
984                op_metadata: op_metadata.clone(),
985            },
986            HydroRoot::DestSink {
987                sink,
988                input,
989                op_metadata,
990            } => HydroRoot::DestSink {
991                sink: sink.clone(),
992                input: Box::new(input.deep_clone(seen_tees)),
993                op_metadata: op_metadata.clone(),
994            },
995            HydroRoot::CycleSink {
996                ident,
997                input,
998                op_metadata,
999            } => HydroRoot::CycleSink {
1000                ident: ident.clone(),
1001                input: Box::new(input.deep_clone(seen_tees)),
1002                op_metadata: op_metadata.clone(),
1003            },
1004        }
1005    }
1006
1007    #[cfg(feature = "build")]
1008    pub fn emit<'a, D: Deploy<'a>>(
1009        &mut self,
1010        graph_builders: &mut dyn DfirBuilder,
1011        seen_tees: &mut SeenTees,
1012        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013        next_stmt_id: &mut usize,
1014    ) {
1015        self.emit_core::<D>(
1016            &mut BuildersOrCallback::<
1017                fn(&mut HydroRoot, &mut usize),
1018                fn(&mut HydroNode, &mut usize),
1019            >::Builders(graph_builders),
1020            seen_tees,
1021            built_tees,
1022            next_stmt_id,
1023        );
1024    }
1025
1026    #[cfg(feature = "build")]
1027    pub fn emit_core<'a, D: Deploy<'a>>(
1028        &mut self,
1029        builders_or_callback: &mut BuildersOrCallback<
1030            impl FnMut(&mut HydroRoot, &mut usize),
1031            impl FnMut(&mut HydroNode, &mut usize),
1032        >,
1033        seen_tees: &mut SeenTees,
1034        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1035        next_stmt_id: &mut usize,
1036    ) {
1037        match self {
1038            HydroRoot::ForEach { f, input, .. } => {
1039                let input_ident =
1040                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1041
1042                match builders_or_callback {
1043                    BuildersOrCallback::Builders(graph_builders) => {
1044                        graph_builders
1045                            .get_dfir_mut(&input.metadata().location_kind)
1046                            .add_dfir(
1047                                parse_quote! {
1048                                    #input_ident -> for_each(#f);
1049                                },
1050                                None,
1051                                Some(&next_stmt_id.to_string()),
1052                            );
1053                    }
1054                    BuildersOrCallback::Callback(leaf_callback, _) => {
1055                        leaf_callback(self, next_stmt_id);
1056                    }
1057                }
1058
1059                *next_stmt_id += 1;
1060            }
1061
1062            HydroRoot::SendExternal {
1063                serialize_fn,
1064                instantiate_fn,
1065                input,
1066                ..
1067            } => {
1068                let input_ident =
1069                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1070
1071                match builders_or_callback {
1072                    BuildersOrCallback::Builders(graph_builders) => {
1073                        let (sink_expr, _) = match instantiate_fn {
1074                            DebugInstantiate::Building => (
1075                                syn::parse_quote!(DUMMY_SINK),
1076                                syn::parse_quote!(DUMMY_SOURCE),
1077                            ),
1078
1079                            DebugInstantiate::Finalized(finalized) => {
1080                                (finalized.sink.clone(), finalized.source.clone())
1081                            }
1082                        };
1083
1084                        graph_builders.create_external_output(
1085                            &input.metadata().location_kind,
1086                            sink_expr,
1087                            &input_ident,
1088                            serialize_fn,
1089                            *next_stmt_id,
1090                        );
1091                    }
1092                    BuildersOrCallback::Callback(leaf_callback, _) => {
1093                        leaf_callback(self, next_stmt_id);
1094                    }
1095                }
1096
1097                *next_stmt_id += 1;
1098            }
1099
1100            HydroRoot::DestSink { sink, input, .. } => {
1101                let input_ident =
1102                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1103
1104                match builders_or_callback {
1105                    BuildersOrCallback::Builders(graph_builders) => {
1106                        graph_builders
1107                            .get_dfir_mut(&input.metadata().location_kind)
1108                            .add_dfir(
1109                                parse_quote! {
1110                                    #input_ident -> dest_sink(#sink);
1111                                },
1112                                None,
1113                                Some(&next_stmt_id.to_string()),
1114                            );
1115                    }
1116                    BuildersOrCallback::Callback(leaf_callback, _) => {
1117                        leaf_callback(self, next_stmt_id);
1118                    }
1119                }
1120
1121                *next_stmt_id += 1;
1122            }
1123
1124            HydroRoot::CycleSink { ident, input, .. } => {
1125                let input_ident =
1126                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1127
1128                match builders_or_callback {
1129                    BuildersOrCallback::Builders(graph_builders) => {
1130                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1131                            CollectionKind::KeyedSingleton {
1132                                key_type,
1133                                value_type,
1134                                ..
1135                            }
1136                            | CollectionKind::KeyedStream {
1137                                key_type,
1138                                value_type,
1139                                ..
1140                            } => {
1141                                parse_quote!((#key_type, #value_type))
1142                            }
1143                            CollectionKind::Stream { element_type, .. }
1144                            | CollectionKind::Singleton { element_type, .. }
1145                            | CollectionKind::Optional { element_type, .. } => {
1146                                parse_quote!(#element_type)
1147                            }
1148                        };
1149
1150                        graph_builders
1151                            .get_dfir_mut(&input.metadata().location_kind)
1152                            .add_dfir(
1153                                parse_quote! {
1154                                    #ident = #input_ident -> identity::<#elem_type>();
1155                                },
1156                                None,
1157                                None,
1158                            );
1159                    }
1160                    // No ID, no callback
1161                    BuildersOrCallback::Callback(_, _) => {}
1162                }
1163            }
1164        }
1165    }
1166
1167    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1168        match self {
1169            HydroRoot::ForEach { op_metadata, .. }
1170            | HydroRoot::SendExternal { op_metadata, .. }
1171            | HydroRoot::DestSink { op_metadata, .. }
1172            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1173        }
1174    }
1175
1176    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1177        match self {
1178            HydroRoot::ForEach { op_metadata, .. }
1179            | HydroRoot::SendExternal { op_metadata, .. }
1180            | HydroRoot::DestSink { op_metadata, .. }
1181            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1182        }
1183    }
1184
1185    pub fn input(&self) -> &HydroNode {
1186        match self {
1187            HydroRoot::ForEach { input, .. }
1188            | HydroRoot::SendExternal { input, .. }
1189            | HydroRoot::DestSink { input, .. }
1190            | HydroRoot::CycleSink { input, .. } => input,
1191        }
1192    }
1193
1194    pub fn input_metadata(&self) -> &HydroIrMetadata {
1195        self.input().metadata()
1196    }
1197
1198    pub fn print_root(&self) -> String {
1199        match self {
1200            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1201            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1202            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1203            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1204        }
1205    }
1206
1207    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1208        match self {
1209            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1210                transform(f);
1211            }
1212            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1213        }
1214    }
1215}
1216
1217#[cfg(feature = "build")]
1218pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1219    let mut builders = BTreeMap::new();
1220    let mut seen_tees = HashMap::new();
1221    let mut built_tees = HashMap::new();
1222    let mut next_stmt_id = 0;
1223    for leaf in ir {
1224        leaf.emit::<D>(
1225            &mut builders,
1226            &mut seen_tees,
1227            &mut built_tees,
1228            &mut next_stmt_id,
1229        );
1230    }
1231    builders
1232}
1233
1234#[cfg(feature = "build")]
1235pub fn traverse_dfir<'a, D: Deploy<'a>>(
1236    ir: &mut [HydroRoot],
1237    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1238    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1239) {
1240    let mut seen_tees = HashMap::new();
1241    let mut built_tees = HashMap::new();
1242    let mut next_stmt_id = 0;
1243    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1244    ir.iter_mut().for_each(|leaf| {
1245        leaf.emit_core::<D>(
1246            &mut callback,
1247            &mut seen_tees,
1248            &mut built_tees,
1249            &mut next_stmt_id,
1250        );
1251    });
1252}
1253
1254pub fn transform_bottom_up(
1255    ir: &mut [HydroRoot],
1256    transform_root: &mut impl FnMut(&mut HydroRoot),
1257    transform_node: &mut impl FnMut(&mut HydroNode),
1258    check_well_formed: bool,
1259) {
1260    let mut seen_tees = HashMap::new();
1261    ir.iter_mut().for_each(|leaf| {
1262        leaf.transform_bottom_up(
1263            transform_root,
1264            transform_node,
1265            &mut seen_tees,
1266            check_well_formed,
1267        );
1268    });
1269}
1270
1271pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1272    let mut seen_tees = HashMap::new();
1273    ir.iter()
1274        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1275        .collect()
1276}
1277
1278type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1279thread_local! {
1280    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1281}
1282
1283pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1284    PRINTED_TEES.with(|printed_tees| {
1285        let mut printed_tees_mut = printed_tees.borrow_mut();
1286        *printed_tees_mut = Some((0, HashMap::new()));
1287        drop(printed_tees_mut);
1288
1289        let ret = f();
1290
1291        let mut printed_tees_mut = printed_tees.borrow_mut();
1292        *printed_tees_mut = None;
1293
1294        ret
1295    })
1296}
1297
1298pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1299
1300impl TeeNode {
1301    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1302        Rc::as_ptr(&self.0)
1303    }
1304}
1305
1306impl Debug for TeeNode {
1307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1308        PRINTED_TEES.with(|printed_tees| {
1309            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1310            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1311
1312            if let Some(printed_tees_mut) = printed_tees_mut {
1313                if let Some(existing) = printed_tees_mut
1314                    .1
1315                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1316                {
1317                    write!(f, "<tee {}>", existing)
1318                } else {
1319                    let next_id = printed_tees_mut.0;
1320                    printed_tees_mut.0 += 1;
1321                    printed_tees_mut
1322                        .1
1323                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1324                    drop(printed_tees_mut_borrow);
1325                    write!(f, "<tee {}>: ", next_id)?;
1326                    Debug::fmt(&self.0.borrow(), f)
1327                }
1328            } else {
1329                drop(printed_tees_mut_borrow);
1330                write!(f, "<tee>: ")?;
1331                Debug::fmt(&self.0.borrow(), f)
1332            }
1333        })
1334    }
1335}
1336
1337impl Hash for TeeNode {
1338    fn hash<H: Hasher>(&self, state: &mut H) {
1339        self.0.borrow_mut().hash(state);
1340    }
1341}
1342
1343#[derive(Clone, PartialEq, Eq, Debug)]
1344pub enum BoundKind {
1345    Unbounded,
1346    Bounded,
1347}
1348
1349#[derive(Clone, PartialEq, Eq, Debug)]
1350pub enum StreamOrder {
1351    NoOrder,
1352    TotalOrder,
1353}
1354
1355#[derive(Clone, PartialEq, Eq, Debug)]
1356pub enum StreamRetry {
1357    AtLeastOnce,
1358    ExactlyOnce,
1359}
1360
1361#[derive(Clone, PartialEq, Eq, Debug)]
1362pub enum KeyedSingletonBoundKind {
1363    Unbounded,
1364    BoundedValue,
1365    Bounded,
1366}
1367
1368#[derive(Clone, PartialEq, Eq, Debug)]
1369pub enum CollectionKind {
1370    Stream {
1371        bound: BoundKind,
1372        order: StreamOrder,
1373        retry: StreamRetry,
1374        element_type: DebugType,
1375    },
1376    Singleton {
1377        bound: BoundKind,
1378        element_type: DebugType,
1379    },
1380    Optional {
1381        bound: BoundKind,
1382        element_type: DebugType,
1383    },
1384    KeyedStream {
1385        bound: BoundKind,
1386        value_order: StreamOrder,
1387        value_retry: StreamRetry,
1388        key_type: DebugType,
1389        value_type: DebugType,
1390    },
1391    KeyedSingleton {
1392        bound: KeyedSingletonBoundKind,
1393        key_type: DebugType,
1394        value_type: DebugType,
1395    },
1396}
1397
1398impl CollectionKind {
1399    pub fn is_bounded(&self) -> bool {
1400        matches!(
1401            self,
1402            CollectionKind::Stream {
1403                bound: BoundKind::Bounded,
1404                ..
1405            } | CollectionKind::Singleton {
1406                bound: BoundKind::Bounded,
1407                ..
1408            } | CollectionKind::Optional {
1409                bound: BoundKind::Bounded,
1410                ..
1411            } | CollectionKind::KeyedStream {
1412                bound: BoundKind::Bounded,
1413                ..
1414            } | CollectionKind::KeyedSingleton {
1415                bound: KeyedSingletonBoundKind::Bounded,
1416                ..
1417            }
1418        )
1419    }
1420}
1421
1422#[derive(Clone)]
1423pub struct HydroIrMetadata {
1424    pub location_kind: LocationId,
1425    pub collection_kind: CollectionKind,
1426    pub cardinality: Option<usize>,
1427    pub tag: Option<String>,
1428    pub op: HydroIrOpMetadata,
1429}
1430
1431// HydroIrMetadata shouldn't be used to hash or compare
1432impl Hash for HydroIrMetadata {
1433    fn hash<H: Hasher>(&self, _: &mut H) {}
1434}
1435
1436impl PartialEq for HydroIrMetadata {
1437    fn eq(&self, _: &Self) -> bool {
1438        true
1439    }
1440}
1441
1442impl Eq for HydroIrMetadata {}
1443
1444impl Debug for HydroIrMetadata {
1445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1446        f.debug_struct("HydroIrMetadata")
1447            .field("location_kind", &self.location_kind)
1448            .field("collection_kind", &self.collection_kind)
1449            .finish()
1450    }
1451}
1452
1453/// Metadata that is specific to the operator itself, rather than its outputs.
1454/// This is available on _both_ inner nodes and roots.
1455#[derive(Clone)]
1456pub struct HydroIrOpMetadata {
1457    pub backtrace: Backtrace,
1458    pub cpu_usage: Option<f64>,
1459    pub network_recv_cpu_usage: Option<f64>,
1460    pub id: Option<usize>,
1461}
1462
1463impl HydroIrOpMetadata {
1464    #[expect(
1465        clippy::new_without_default,
1466        reason = "explicit calls to new ensure correct backtrace bounds"
1467    )]
1468    pub fn new() -> HydroIrOpMetadata {
1469        Self::new_with_skip(1)
1470    }
1471
1472    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1473        HydroIrOpMetadata {
1474            backtrace: Backtrace::get_backtrace(2 + skip_count),
1475            cpu_usage: None,
1476            network_recv_cpu_usage: None,
1477            id: None,
1478        }
1479    }
1480}
1481
1482impl Debug for HydroIrOpMetadata {
1483    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1484        f.debug_struct("HydroIrOpMetadata").finish()
1485    }
1486}
1487
1488impl Hash for HydroIrOpMetadata {
1489    fn hash<H: Hasher>(&self, _: &mut H) {}
1490}
1491
1492/// An intermediate node in a Hydro graph, which consumes data
1493/// from upstream nodes and emits data to downstream nodes.
1494#[derive(Debug, Hash)]
1495pub enum HydroNode {
1496    Placeholder,
1497
1498    /// Manually "casts" between two different collection kinds.
1499    ///
1500    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1501    /// correctness checks. In particular, the user must ensure that every possible
1502    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1503    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1504    /// collection. This ensures that the simulator does not miss any possible outputs.
1505    Cast {
1506        inner: Box<HydroNode>,
1507        metadata: HydroIrMetadata,
1508    },
1509
1510    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1511    /// interpretation of the input stream.
1512    ///
1513    /// In production, this simply passes through the input, but in simulation, this operator
1514    /// explicitly selects a randomized interpretation.
1515    ObserveNonDet {
1516        inner: Box<HydroNode>,
1517        trusted: bool, // if true, we do not need to simulate non-determinism
1518        metadata: HydroIrMetadata,
1519    },
1520
1521    Source {
1522        source: HydroSource,
1523        metadata: HydroIrMetadata,
1524    },
1525
1526    SingletonSource {
1527        value: DebugExpr,
1528        metadata: HydroIrMetadata,
1529    },
1530
1531    CycleSource {
1532        ident: syn::Ident,
1533        metadata: HydroIrMetadata,
1534    },
1535
1536    Tee {
1537        inner: TeeNode,
1538        metadata: HydroIrMetadata,
1539    },
1540
1541    BeginAtomic {
1542        inner: Box<HydroNode>,
1543        metadata: HydroIrMetadata,
1544    },
1545
1546    EndAtomic {
1547        inner: Box<HydroNode>,
1548        metadata: HydroIrMetadata,
1549    },
1550
1551    Batch {
1552        inner: Box<HydroNode>,
1553        metadata: HydroIrMetadata,
1554    },
1555
1556    YieldConcat {
1557        inner: Box<HydroNode>,
1558        metadata: HydroIrMetadata,
1559    },
1560
1561    Chain {
1562        first: Box<HydroNode>,
1563        second: Box<HydroNode>,
1564        metadata: HydroIrMetadata,
1565    },
1566
1567    ChainFirst {
1568        first: Box<HydroNode>,
1569        second: Box<HydroNode>,
1570        metadata: HydroIrMetadata,
1571    },
1572
1573    CrossProduct {
1574        left: Box<HydroNode>,
1575        right: Box<HydroNode>,
1576        metadata: HydroIrMetadata,
1577    },
1578
1579    CrossSingleton {
1580        left: Box<HydroNode>,
1581        right: Box<HydroNode>,
1582        metadata: HydroIrMetadata,
1583    },
1584
1585    Join {
1586        left: Box<HydroNode>,
1587        right: Box<HydroNode>,
1588        metadata: HydroIrMetadata,
1589    },
1590
1591    Difference {
1592        pos: Box<HydroNode>,
1593        neg: Box<HydroNode>,
1594        metadata: HydroIrMetadata,
1595    },
1596
1597    AntiJoin {
1598        pos: Box<HydroNode>,
1599        neg: Box<HydroNode>,
1600        metadata: HydroIrMetadata,
1601    },
1602
1603    ResolveFutures {
1604        input: Box<HydroNode>,
1605        metadata: HydroIrMetadata,
1606    },
1607    ResolveFuturesOrdered {
1608        input: Box<HydroNode>,
1609        metadata: HydroIrMetadata,
1610    },
1611
1612    Map {
1613        f: DebugExpr,
1614        input: Box<HydroNode>,
1615        metadata: HydroIrMetadata,
1616    },
1617    FlatMap {
1618        f: DebugExpr,
1619        input: Box<HydroNode>,
1620        metadata: HydroIrMetadata,
1621    },
1622    Filter {
1623        f: DebugExpr,
1624        input: Box<HydroNode>,
1625        metadata: HydroIrMetadata,
1626    },
1627    FilterMap {
1628        f: DebugExpr,
1629        input: Box<HydroNode>,
1630        metadata: HydroIrMetadata,
1631    },
1632
1633    DeferTick {
1634        input: Box<HydroNode>,
1635        metadata: HydroIrMetadata,
1636    },
1637    Enumerate {
1638        input: Box<HydroNode>,
1639        metadata: HydroIrMetadata,
1640    },
1641    Inspect {
1642        f: DebugExpr,
1643        input: Box<HydroNode>,
1644        metadata: HydroIrMetadata,
1645    },
1646
1647    Unique {
1648        input: Box<HydroNode>,
1649        metadata: HydroIrMetadata,
1650    },
1651
1652    Sort {
1653        input: Box<HydroNode>,
1654        metadata: HydroIrMetadata,
1655    },
1656    Fold {
1657        init: DebugExpr,
1658        acc: DebugExpr,
1659        input: Box<HydroNode>,
1660        metadata: HydroIrMetadata,
1661    },
1662
1663    Scan {
1664        init: DebugExpr,
1665        acc: DebugExpr,
1666        input: Box<HydroNode>,
1667        metadata: HydroIrMetadata,
1668    },
1669    FoldKeyed {
1670        init: DebugExpr,
1671        acc: DebugExpr,
1672        input: Box<HydroNode>,
1673        metadata: HydroIrMetadata,
1674    },
1675
1676    Reduce {
1677        f: DebugExpr,
1678        input: Box<HydroNode>,
1679        metadata: HydroIrMetadata,
1680    },
1681    ReduceKeyed {
1682        f: DebugExpr,
1683        input: Box<HydroNode>,
1684        metadata: HydroIrMetadata,
1685    },
1686    ReduceKeyedWatermark {
1687        f: DebugExpr,
1688        input: Box<HydroNode>,
1689        watermark: Box<HydroNode>,
1690        metadata: HydroIrMetadata,
1691    },
1692
1693    Network {
1694        serialize_fn: Option<DebugExpr>,
1695        instantiate_fn: DebugInstantiate,
1696        deserialize_fn: Option<DebugExpr>,
1697        input: Box<HydroNode>,
1698        metadata: HydroIrMetadata,
1699    },
1700
1701    ExternalInput {
1702        from_external_id: usize,
1703        from_port_id: ExternalPortId,
1704        from_many: bool,
1705        codec_type: DebugType,
1706        port_hint: NetworkHint,
1707        instantiate_fn: DebugInstantiate,
1708        deserialize_fn: Option<DebugExpr>,
1709        metadata: HydroIrMetadata,
1710    },
1711
1712    Counter {
1713        tag: String,
1714        duration: DebugExpr,
1715        prefix: String,
1716        input: Box<HydroNode>,
1717        metadata: HydroIrMetadata,
1718    },
1719}
1720
1721pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1722pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1723
1724impl HydroNode {
1725    pub fn transform_bottom_up(
1726        &mut self,
1727        transform: &mut impl FnMut(&mut HydroNode),
1728        seen_tees: &mut SeenTees,
1729        check_well_formed: bool,
1730    ) {
1731        self.transform_children(
1732            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1733            seen_tees,
1734        );
1735
1736        transform(self);
1737
1738        let self_location = self.metadata().location_kind.root();
1739
1740        if check_well_formed {
1741            match &*self {
1742                HydroNode::Network { .. } => {}
1743                _ => {
1744                    self.input_metadata().iter().for_each(|i| {
1745                        if i.location_kind.root() != self_location {
1746                            panic!(
1747                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1748                                i,
1749                                i.location_kind.root(),
1750                                self,
1751                                self_location
1752                            )
1753                        }
1754                    });
1755                }
1756            }
1757        }
1758    }
1759
1760    #[inline(always)]
1761    pub fn transform_children(
1762        &mut self,
1763        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1764        seen_tees: &mut SeenTees,
1765    ) {
1766        match self {
1767            HydroNode::Placeholder => {
1768                panic!();
1769            }
1770
1771            HydroNode::Source { .. }
1772            | HydroNode::SingletonSource { .. }
1773            | HydroNode::CycleSource { .. }
1774            | HydroNode::ExternalInput { .. } => {}
1775
1776            HydroNode::Tee { inner, .. } => {
1777                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1778                    *inner = TeeNode(transformed.clone());
1779                } else {
1780                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1781                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1782                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1783                    transform(&mut orig, seen_tees);
1784                    *transformed_cell.borrow_mut() = orig;
1785                    *inner = TeeNode(transformed_cell);
1786                }
1787            }
1788
1789            HydroNode::Cast { inner, .. }
1790            | HydroNode::ObserveNonDet { inner, .. }
1791            | HydroNode::BeginAtomic { inner, .. }
1792            | HydroNode::EndAtomic { inner, .. }
1793            | HydroNode::Batch { inner, .. }
1794            | HydroNode::YieldConcat { inner, .. } => {
1795                transform(inner.as_mut(), seen_tees);
1796            }
1797
1798            HydroNode::Chain { first, second, .. } => {
1799                transform(first.as_mut(), seen_tees);
1800                transform(second.as_mut(), seen_tees);
1801            }
1802
1803            HydroNode::ChainFirst { first, second, .. } => {
1804                transform(first.as_mut(), seen_tees);
1805                transform(second.as_mut(), seen_tees);
1806            }
1807
1808            HydroNode::CrossSingleton { left, right, .. }
1809            | HydroNode::CrossProduct { left, right, .. }
1810            | HydroNode::Join { left, right, .. } => {
1811                transform(left.as_mut(), seen_tees);
1812                transform(right.as_mut(), seen_tees);
1813            }
1814
1815            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1816                transform(pos.as_mut(), seen_tees);
1817                transform(neg.as_mut(), seen_tees);
1818            }
1819
1820            HydroNode::ReduceKeyedWatermark {
1821                input, watermark, ..
1822            } => {
1823                transform(input.as_mut(), seen_tees);
1824                transform(watermark.as_mut(), seen_tees);
1825            }
1826
1827            HydroNode::Map { input, .. }
1828            | HydroNode::ResolveFutures { input, .. }
1829            | HydroNode::ResolveFuturesOrdered { input, .. }
1830            | HydroNode::FlatMap { input, .. }
1831            | HydroNode::Filter { input, .. }
1832            | HydroNode::FilterMap { input, .. }
1833            | HydroNode::Sort { input, .. }
1834            | HydroNode::DeferTick { input, .. }
1835            | HydroNode::Enumerate { input, .. }
1836            | HydroNode::Inspect { input, .. }
1837            | HydroNode::Unique { input, .. }
1838            | HydroNode::Network { input, .. }
1839            | HydroNode::Fold { input, .. }
1840            | HydroNode::Scan { input, .. }
1841            | HydroNode::FoldKeyed { input, .. }
1842            | HydroNode::Reduce { input, .. }
1843            | HydroNode::ReduceKeyed { input, .. }
1844            | HydroNode::Counter { input, .. } => {
1845                transform(input.as_mut(), seen_tees);
1846            }
1847        }
1848    }
1849
1850    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1851        match self {
1852            HydroNode::Placeholder => HydroNode::Placeholder,
1853            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1854                inner: Box::new(inner.deep_clone(seen_tees)),
1855                metadata: metadata.clone(),
1856            },
1857            HydroNode::ObserveNonDet {
1858                inner,
1859                trusted,
1860                metadata,
1861            } => HydroNode::ObserveNonDet {
1862                inner: Box::new(inner.deep_clone(seen_tees)),
1863                trusted: *trusted,
1864                metadata: metadata.clone(),
1865            },
1866            HydroNode::Source { source, metadata } => HydroNode::Source {
1867                source: source.clone(),
1868                metadata: metadata.clone(),
1869            },
1870            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1871                value: value.clone(),
1872                metadata: metadata.clone(),
1873            },
1874            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1875                ident: ident.clone(),
1876                metadata: metadata.clone(),
1877            },
1878            HydroNode::Tee { inner, metadata } => {
1879                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1880                    HydroNode::Tee {
1881                        inner: TeeNode(transformed.clone()),
1882                        metadata: metadata.clone(),
1883                    }
1884                } else {
1885                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1886                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1887                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1888                    *new_rc.borrow_mut() = cloned;
1889                    HydroNode::Tee {
1890                        inner: TeeNode(new_rc),
1891                        metadata: metadata.clone(),
1892                    }
1893                }
1894            }
1895            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1896                inner: Box::new(inner.deep_clone(seen_tees)),
1897                metadata: metadata.clone(),
1898            },
1899            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1900                inner: Box::new(inner.deep_clone(seen_tees)),
1901                metadata: metadata.clone(),
1902            },
1903            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1904                inner: Box::new(inner.deep_clone(seen_tees)),
1905                metadata: metadata.clone(),
1906            },
1907            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1908                inner: Box::new(inner.deep_clone(seen_tees)),
1909                metadata: metadata.clone(),
1910            },
1911            HydroNode::Chain {
1912                first,
1913                second,
1914                metadata,
1915            } => HydroNode::Chain {
1916                first: Box::new(first.deep_clone(seen_tees)),
1917                second: Box::new(second.deep_clone(seen_tees)),
1918                metadata: metadata.clone(),
1919            },
1920            HydroNode::ChainFirst {
1921                first,
1922                second,
1923                metadata,
1924            } => HydroNode::ChainFirst {
1925                first: Box::new(first.deep_clone(seen_tees)),
1926                second: Box::new(second.deep_clone(seen_tees)),
1927                metadata: metadata.clone(),
1928            },
1929            HydroNode::CrossProduct {
1930                left,
1931                right,
1932                metadata,
1933            } => HydroNode::CrossProduct {
1934                left: Box::new(left.deep_clone(seen_tees)),
1935                right: Box::new(right.deep_clone(seen_tees)),
1936                metadata: metadata.clone(),
1937            },
1938            HydroNode::CrossSingleton {
1939                left,
1940                right,
1941                metadata,
1942            } => HydroNode::CrossSingleton {
1943                left: Box::new(left.deep_clone(seen_tees)),
1944                right: Box::new(right.deep_clone(seen_tees)),
1945                metadata: metadata.clone(),
1946            },
1947            HydroNode::Join {
1948                left,
1949                right,
1950                metadata,
1951            } => HydroNode::Join {
1952                left: Box::new(left.deep_clone(seen_tees)),
1953                right: Box::new(right.deep_clone(seen_tees)),
1954                metadata: metadata.clone(),
1955            },
1956            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1957                pos: Box::new(pos.deep_clone(seen_tees)),
1958                neg: Box::new(neg.deep_clone(seen_tees)),
1959                metadata: metadata.clone(),
1960            },
1961            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1962                pos: Box::new(pos.deep_clone(seen_tees)),
1963                neg: Box::new(neg.deep_clone(seen_tees)),
1964                metadata: metadata.clone(),
1965            },
1966            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1967                input: Box::new(input.deep_clone(seen_tees)),
1968                metadata: metadata.clone(),
1969            },
1970            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1971                HydroNode::ResolveFuturesOrdered {
1972                    input: Box::new(input.deep_clone(seen_tees)),
1973                    metadata: metadata.clone(),
1974                }
1975            }
1976            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1977                f: f.clone(),
1978                input: Box::new(input.deep_clone(seen_tees)),
1979                metadata: metadata.clone(),
1980            },
1981            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1982                f: f.clone(),
1983                input: Box::new(input.deep_clone(seen_tees)),
1984                metadata: metadata.clone(),
1985            },
1986            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1987                f: f.clone(),
1988                input: Box::new(input.deep_clone(seen_tees)),
1989                metadata: metadata.clone(),
1990            },
1991            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1992                f: f.clone(),
1993                input: Box::new(input.deep_clone(seen_tees)),
1994                metadata: metadata.clone(),
1995            },
1996            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1997                input: Box::new(input.deep_clone(seen_tees)),
1998                metadata: metadata.clone(),
1999            },
2000            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2001                input: Box::new(input.deep_clone(seen_tees)),
2002                metadata: metadata.clone(),
2003            },
2004            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2005                f: f.clone(),
2006                input: Box::new(input.deep_clone(seen_tees)),
2007                metadata: metadata.clone(),
2008            },
2009            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2010                input: Box::new(input.deep_clone(seen_tees)),
2011                metadata: metadata.clone(),
2012            },
2013            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2014                input: Box::new(input.deep_clone(seen_tees)),
2015                metadata: metadata.clone(),
2016            },
2017            HydroNode::Fold {
2018                init,
2019                acc,
2020                input,
2021                metadata,
2022            } => HydroNode::Fold {
2023                init: init.clone(),
2024                acc: acc.clone(),
2025                input: Box::new(input.deep_clone(seen_tees)),
2026                metadata: metadata.clone(),
2027            },
2028            HydroNode::Scan {
2029                init,
2030                acc,
2031                input,
2032                metadata,
2033            } => HydroNode::Scan {
2034                init: init.clone(),
2035                acc: acc.clone(),
2036                input: Box::new(input.deep_clone(seen_tees)),
2037                metadata: metadata.clone(),
2038            },
2039            HydroNode::FoldKeyed {
2040                init,
2041                acc,
2042                input,
2043                metadata,
2044            } => HydroNode::FoldKeyed {
2045                init: init.clone(),
2046                acc: acc.clone(),
2047                input: Box::new(input.deep_clone(seen_tees)),
2048                metadata: metadata.clone(),
2049            },
2050            HydroNode::ReduceKeyedWatermark {
2051                f,
2052                input,
2053                watermark,
2054                metadata,
2055            } => HydroNode::ReduceKeyedWatermark {
2056                f: f.clone(),
2057                input: Box::new(input.deep_clone(seen_tees)),
2058                watermark: Box::new(watermark.deep_clone(seen_tees)),
2059                metadata: metadata.clone(),
2060            },
2061            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2062                f: f.clone(),
2063                input: Box::new(input.deep_clone(seen_tees)),
2064                metadata: metadata.clone(),
2065            },
2066            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2067                f: f.clone(),
2068                input: Box::new(input.deep_clone(seen_tees)),
2069                metadata: metadata.clone(),
2070            },
2071            HydroNode::Network {
2072                serialize_fn,
2073                instantiate_fn,
2074                deserialize_fn,
2075                input,
2076                metadata,
2077            } => HydroNode::Network {
2078                serialize_fn: serialize_fn.clone(),
2079                instantiate_fn: instantiate_fn.clone(),
2080                deserialize_fn: deserialize_fn.clone(),
2081                input: Box::new(input.deep_clone(seen_tees)),
2082                metadata: metadata.clone(),
2083            },
2084            HydroNode::ExternalInput {
2085                from_external_id,
2086                from_port_id,
2087                from_many,
2088                codec_type,
2089                port_hint,
2090                instantiate_fn,
2091                deserialize_fn,
2092                metadata,
2093            } => HydroNode::ExternalInput {
2094                from_external_id: *from_external_id,
2095                from_port_id: *from_port_id,
2096                from_many: *from_many,
2097                codec_type: codec_type.clone(),
2098                port_hint: *port_hint,
2099                instantiate_fn: instantiate_fn.clone(),
2100                deserialize_fn: deserialize_fn.clone(),
2101                metadata: metadata.clone(),
2102            },
2103            HydroNode::Counter {
2104                tag,
2105                duration,
2106                prefix,
2107                input,
2108                metadata,
2109            } => HydroNode::Counter {
2110                tag: tag.clone(),
2111                duration: duration.clone(),
2112                prefix: prefix.clone(),
2113                input: Box::new(input.deep_clone(seen_tees)),
2114                metadata: metadata.clone(),
2115            },
2116        }
2117    }
2118
2119    #[cfg(feature = "build")]
2120    pub fn emit_core<'a, D: Deploy<'a>>(
2121        &mut self,
2122        builders_or_callback: &mut BuildersOrCallback<
2123            impl FnMut(&mut HydroRoot, &mut usize),
2124            impl FnMut(&mut HydroNode, &mut usize),
2125        >,
2126        seen_tees: &mut SeenTees,
2127        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2128        next_stmt_id: &mut usize,
2129    ) -> syn::Ident {
2130        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2131
2132        self.transform_bottom_up(
2133            &mut |node: &mut HydroNode| {
2134                let out_location = node.metadata().location_kind.clone();
2135                match node {
2136                    HydroNode::Placeholder => {
2137                        panic!()
2138                    }
2139
2140                    HydroNode::Cast { .. } => {
2141                        // Cast passes through the input ident unchanged
2142                        // The input ident is already on the stack from processing the child
2143                        match builders_or_callback {
2144                            BuildersOrCallback::Builders(_) => {}
2145                            BuildersOrCallback::Callback(_, node_callback) => {
2146                                node_callback(node, next_stmt_id);
2147                            }
2148                        }
2149
2150                        *next_stmt_id += 1;
2151                        // input_ident stays on stack as output
2152                    }
2153
2154                    HydroNode::ObserveNonDet {
2155                        inner,
2156                        trusted,
2157                        metadata,
2158                        ..
2159                    } => {
2160                        let inner_ident = ident_stack.pop().unwrap();
2161
2162                        let observe_ident =
2163                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2164
2165                        match builders_or_callback {
2166                            BuildersOrCallback::Builders(graph_builders) => {
2167                                graph_builders.observe_nondet(
2168                                    *trusted,
2169                                    &inner.metadata().location_kind,
2170                                    inner_ident,
2171                                    &inner.metadata().collection_kind,
2172                                    &observe_ident,
2173                                    &metadata.collection_kind,
2174                                    &metadata.op,
2175                                );
2176                            }
2177                            BuildersOrCallback::Callback(_, node_callback) => {
2178                                node_callback(node, next_stmt_id);
2179                            }
2180                        }
2181
2182                        *next_stmt_id += 1;
2183
2184                        ident_stack.push(observe_ident);
2185                    }
2186
2187                    HydroNode::Batch {
2188                        inner, metadata, ..
2189                    } => {
2190                        let inner_ident = ident_stack.pop().unwrap();
2191
2192                        let batch_ident =
2193                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2194
2195                        match builders_or_callback {
2196                            BuildersOrCallback::Builders(graph_builders) => {
2197                                graph_builders.batch(
2198                                    inner_ident,
2199                                    &inner.metadata().location_kind,
2200                                    &inner.metadata().collection_kind,
2201                                    &batch_ident,
2202                                    &out_location,
2203                                    &metadata.op,
2204                                );
2205                            }
2206                            BuildersOrCallback::Callback(_, node_callback) => {
2207                                node_callback(node, next_stmt_id);
2208                            }
2209                        }
2210
2211                        *next_stmt_id += 1;
2212
2213                        ident_stack.push(batch_ident);
2214                    }
2215
2216                    HydroNode::YieldConcat { inner, .. } => {
2217                        let inner_ident = ident_stack.pop().unwrap();
2218
2219                        let yield_ident =
2220                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2221
2222                        match builders_or_callback {
2223                            BuildersOrCallback::Builders(graph_builders) => {
2224                                graph_builders.yield_from_tick(
2225                                    inner_ident,
2226                                    &inner.metadata().location_kind,
2227                                    &inner.metadata().collection_kind,
2228                                    &yield_ident,
2229                                    &out_location,
2230                                );
2231                            }
2232                            BuildersOrCallback::Callback(_, node_callback) => {
2233                                node_callback(node, next_stmt_id);
2234                            }
2235                        }
2236
2237                        *next_stmt_id += 1;
2238
2239                        ident_stack.push(yield_ident);
2240                    }
2241
2242                    HydroNode::BeginAtomic { inner, metadata } => {
2243                        let inner_ident = ident_stack.pop().unwrap();
2244
2245                        let begin_ident =
2246                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2247
2248                        match builders_or_callback {
2249                            BuildersOrCallback::Builders(graph_builders) => {
2250                                graph_builders.begin_atomic(
2251                                    inner_ident,
2252                                    &inner.metadata().location_kind,
2253                                    &inner.metadata().collection_kind,
2254                                    &begin_ident,
2255                                    &out_location,
2256                                    &metadata.op,
2257                                );
2258                            }
2259                            BuildersOrCallback::Callback(_, node_callback) => {
2260                                node_callback(node, next_stmt_id);
2261                            }
2262                        }
2263
2264                        *next_stmt_id += 1;
2265
2266                        ident_stack.push(begin_ident);
2267                    }
2268
2269                    HydroNode::EndAtomic { inner, .. } => {
2270                        let inner_ident = ident_stack.pop().unwrap();
2271
2272                        let end_ident =
2273                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2274
2275                        match builders_or_callback {
2276                            BuildersOrCallback::Builders(graph_builders) => {
2277                                graph_builders.end_atomic(
2278                                    inner_ident,
2279                                    &inner.metadata().location_kind,
2280                                    &inner.metadata().collection_kind,
2281                                    &end_ident,
2282                                );
2283                            }
2284                            BuildersOrCallback::Callback(_, node_callback) => {
2285                                node_callback(node, next_stmt_id);
2286                            }
2287                        }
2288
2289                        *next_stmt_id += 1;
2290
2291                        ident_stack.push(end_ident);
2292                    }
2293
2294                    HydroNode::Source {
2295                        source, metadata, ..
2296                    } => {
2297                        if let HydroSource::ExternalNetwork() = source {
2298                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2299                        } else {
2300                            let source_ident =
2301                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2302
2303                            let source_stmt = match source {
2304                                HydroSource::Stream(expr) => {
2305                                    debug_assert!(metadata.location_kind.is_top_level());
2306                                    parse_quote! {
2307                                        #source_ident = source_stream(#expr);
2308                                    }
2309                                }
2310
2311                                HydroSource::ExternalNetwork() => {
2312                                    unreachable!()
2313                                }
2314
2315                                HydroSource::Iter(expr) => {
2316                                    if metadata.location_kind.is_top_level() {
2317                                        parse_quote! {
2318                                            #source_ident = source_iter(#expr);
2319                                        }
2320                                    } else {
2321                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2322                                        parse_quote! {
2323                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2324                                        }
2325                                    }
2326                                }
2327
2328                                HydroSource::Spin() => {
2329                                    debug_assert!(metadata.location_kind.is_top_level());
2330                                    parse_quote! {
2331                                        #source_ident = spin();
2332                                    }
2333                                }
2334
2335                                HydroSource::ClusterMembers(location_id) => {
2336                                    debug_assert!(metadata.location_kind.is_top_level());
2337
2338                                    let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2339                                        D::cluster_membership_stream(location_id),
2340                                        &(),
2341                                    );
2342
2343                                    parse_quote! {
2344                                        #source_ident = source_stream(#expr);
2345                                    }
2346                                }
2347                            };
2348
2349                            match builders_or_callback {
2350                                BuildersOrCallback::Builders(graph_builders) => {
2351                                    let builder = graph_builders.get_dfir_mut(&out_location);
2352                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2353                                }
2354                                BuildersOrCallback::Callback(_, node_callback) => {
2355                                    node_callback(node, next_stmt_id);
2356                                }
2357                            }
2358
2359                            *next_stmt_id += 1;
2360
2361                            ident_stack.push(source_ident);
2362                        }
2363                    }
2364
2365                    HydroNode::SingletonSource { value, metadata } => {
2366                        let source_ident =
2367                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2368
2369                        match builders_or_callback {
2370                            BuildersOrCallback::Builders(graph_builders) => {
2371                                let builder = graph_builders.get_dfir_mut(&out_location);
2372
2373                                if metadata.location_kind.is_top_level()
2374                                    && metadata.collection_kind.is_bounded()
2375                                {
2376                                    builder.add_dfir(
2377                                        parse_quote! {
2378                                            #source_ident = source_iter([#value]);
2379                                        },
2380                                        None,
2381                                        Some(&next_stmt_id.to_string()),
2382                                    );
2383                                } else {
2384                                    builder.add_dfir(
2385                                        parse_quote! {
2386                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2387                                        },
2388                                        None,
2389                                        Some(&next_stmt_id.to_string()),
2390                                    );
2391                                }
2392                            }
2393                            BuildersOrCallback::Callback(_, node_callback) => {
2394                                node_callback(node, next_stmt_id);
2395                            }
2396                        }
2397
2398                        *next_stmt_id += 1;
2399
2400                        ident_stack.push(source_ident);
2401                    }
2402
2403                    HydroNode::CycleSource { ident, .. } => {
2404                        let ident = ident.clone();
2405
2406                        match builders_or_callback {
2407                            BuildersOrCallback::Builders(_) => {}
2408                            BuildersOrCallback::Callback(_, node_callback) => {
2409                                node_callback(node, next_stmt_id);
2410                            }
2411                        }
2412
2413                        // consume a stmt id even though we did not emit anything so that we can instrument this
2414                        *next_stmt_id += 1;
2415
2416                        ident_stack.push(ident);
2417                    }
2418
2419                    HydroNode::Tee { inner, .. } => {
2420                        let ret_ident = if let Some(teed_from) =
2421                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2422                        {
2423                            match builders_or_callback {
2424                                BuildersOrCallback::Builders(_) => {}
2425                                BuildersOrCallback::Callback(_, node_callback) => {
2426                                    node_callback(node, next_stmt_id);
2427                                }
2428                            }
2429
2430                            teed_from.clone()
2431                        } else {
2432                            // The inner node was already processed by transform_bottom_up,
2433                            // so its ident is on the stack
2434                            let inner_ident = ident_stack.pop().unwrap();
2435
2436                            let tee_ident =
2437                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2438
2439                            built_tees.insert(
2440                                inner.0.as_ref() as *const RefCell<HydroNode>,
2441                                tee_ident.clone(),
2442                            );
2443
2444                            match builders_or_callback {
2445                                BuildersOrCallback::Builders(graph_builders) => {
2446                                    let builder = graph_builders.get_dfir_mut(&out_location);
2447                                    builder.add_dfir(
2448                                        parse_quote! {
2449                                            #tee_ident = #inner_ident -> tee();
2450                                        },
2451                                        None,
2452                                        Some(&next_stmt_id.to_string()),
2453                                    );
2454                                }
2455                                BuildersOrCallback::Callback(_, node_callback) => {
2456                                    node_callback(node, next_stmt_id);
2457                                }
2458                            }
2459
2460                            tee_ident
2461                        };
2462
2463                        // we consume a stmt id regardless of if we emit the tee() operator,
2464                        // so that during rewrites we touch all recipients of the tee()
2465
2466                        *next_stmt_id += 1;
2467                        ident_stack.push(ret_ident);
2468                    }
2469
2470                    HydroNode::Chain { .. } => {
2471                        // Children are processed left-to-right, so second is on top
2472                        let second_ident = ident_stack.pop().unwrap();
2473                        let first_ident = ident_stack.pop().unwrap();
2474
2475                        let chain_ident =
2476                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2477
2478                        match builders_or_callback {
2479                            BuildersOrCallback::Builders(graph_builders) => {
2480                                let builder = graph_builders.get_dfir_mut(&out_location);
2481                                builder.add_dfir(
2482                                    parse_quote! {
2483                                        #chain_ident = chain();
2484                                        #first_ident -> [0]#chain_ident;
2485                                        #second_ident -> [1]#chain_ident;
2486                                    },
2487                                    None,
2488                                    Some(&next_stmt_id.to_string()),
2489                                );
2490                            }
2491                            BuildersOrCallback::Callback(_, node_callback) => {
2492                                node_callback(node, next_stmt_id);
2493                            }
2494                        }
2495
2496                        *next_stmt_id += 1;
2497
2498                        ident_stack.push(chain_ident);
2499                    }
2500
2501                    HydroNode::ChainFirst { .. } => {
2502                        let second_ident = ident_stack.pop().unwrap();
2503                        let first_ident = ident_stack.pop().unwrap();
2504
2505                        let chain_ident =
2506                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2507
2508                        match builders_or_callback {
2509                            BuildersOrCallback::Builders(graph_builders) => {
2510                                let builder = graph_builders.get_dfir_mut(&out_location);
2511                                builder.add_dfir(
2512                                    parse_quote! {
2513                                        #chain_ident = chain_first_n(1);
2514                                        #first_ident -> [0]#chain_ident;
2515                                        #second_ident -> [1]#chain_ident;
2516                                    },
2517                                    None,
2518                                    Some(&next_stmt_id.to_string()),
2519                                );
2520                            }
2521                            BuildersOrCallback::Callback(_, node_callback) => {
2522                                node_callback(node, next_stmt_id);
2523                            }
2524                        }
2525
2526                        *next_stmt_id += 1;
2527
2528                        ident_stack.push(chain_ident);
2529                    }
2530
2531                    HydroNode::CrossSingleton { right, .. } => {
2532                        let right_ident = ident_stack.pop().unwrap();
2533                        let left_ident = ident_stack.pop().unwrap();
2534
2535                        let cross_ident =
2536                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538                        match builders_or_callback {
2539                            BuildersOrCallback::Builders(graph_builders) => {
2540                                let builder = graph_builders.get_dfir_mut(&out_location);
2541
2542                                if right.metadata().location_kind.is_top_level()
2543                                    && right.metadata().collection_kind.is_bounded()
2544                                {
2545                                    builder.add_dfir(
2546                                        parse_quote! {
2547                                            #cross_ident = cross_singleton();
2548                                            #left_ident -> [input]#cross_ident;
2549                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2550                                        },
2551                                        None,
2552                                        Some(&next_stmt_id.to_string()),
2553                                    );
2554                                } else {
2555                                    builder.add_dfir(
2556                                        parse_quote! {
2557                                            #cross_ident = cross_singleton();
2558                                            #left_ident -> [input]#cross_ident;
2559                                            #right_ident -> [single]#cross_ident;
2560                                        },
2561                                        None,
2562                                        Some(&next_stmt_id.to_string()),
2563                                    );
2564                                }
2565                            }
2566                            BuildersOrCallback::Callback(_, node_callback) => {
2567                                node_callback(node, next_stmt_id);
2568                            }
2569                        }
2570
2571                        *next_stmt_id += 1;
2572
2573                        ident_stack.push(cross_ident);
2574                    }
2575
2576                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2577                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2578                            parse_quote!(cross_join_multiset)
2579                        } else {
2580                            parse_quote!(join_multiset)
2581                        };
2582
2583                        let (HydroNode::CrossProduct { left, right, .. }
2584                        | HydroNode::Join { left, right, .. }) = node
2585                        else {
2586                            unreachable!()
2587                        };
2588
2589                        let is_top_level = left.metadata().location_kind.is_top_level()
2590                            && right.metadata().location_kind.is_top_level();
2591                        let left_lifetime = if left.metadata().location_kind.is_top_level() {
2592                            quote!('static)
2593                        } else {
2594                            quote!('tick)
2595                        };
2596
2597                        let right_lifetime = if right.metadata().location_kind.is_top_level() {
2598                            quote!('static)
2599                        } else {
2600                            quote!('tick)
2601                        };
2602
2603                        let right_ident = ident_stack.pop().unwrap();
2604                        let left_ident = ident_stack.pop().unwrap();
2605
2606                        let stream_ident =
2607                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2608
2609                        match builders_or_callback {
2610                            BuildersOrCallback::Builders(graph_builders) => {
2611                                let builder = graph_builders.get_dfir_mut(&out_location);
2612                                builder.add_dfir(
2613                                    if is_top_level {
2614                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2615                                        // a multiset_delta() to negate the replay behavior
2616                                        parse_quote! {
2617                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2618                                            #left_ident -> [0]#stream_ident;
2619                                            #right_ident -> [1]#stream_ident;
2620                                        }
2621                                    } else {
2622                                        parse_quote! {
2623                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2624                                            #left_ident -> [0]#stream_ident;
2625                                            #right_ident -> [1]#stream_ident;
2626                                        }
2627                                    }
2628                                    ,
2629                                    None,
2630                                    Some(&next_stmt_id.to_string()),
2631                                );
2632                            }
2633                            BuildersOrCallback::Callback(_, node_callback) => {
2634                                node_callback(node, next_stmt_id);
2635                            }
2636                        }
2637
2638                        *next_stmt_id += 1;
2639
2640                        ident_stack.push(stream_ident);
2641                    }
2642
2643                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2644                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2645                            parse_quote!(difference)
2646                        } else {
2647                            parse_quote!(anti_join)
2648                        };
2649
2650                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2651                            node
2652                        else {
2653                            unreachable!()
2654                        };
2655
2656                        let neg_lifetime = if neg.metadata().location_kind.is_top_level() {
2657                            quote!('static)
2658                        } else {
2659                            quote!('tick)
2660                        };
2661
2662                        let neg_ident = ident_stack.pop().unwrap();
2663                        let pos_ident = ident_stack.pop().unwrap();
2664
2665                        let stream_ident =
2666                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2667
2668                        match builders_or_callback {
2669                            BuildersOrCallback::Builders(graph_builders) => {
2670                                let builder = graph_builders.get_dfir_mut(&out_location);
2671                                builder.add_dfir(
2672                                    parse_quote! {
2673                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2674                                        #pos_ident -> [pos]#stream_ident;
2675                                        #neg_ident -> [neg]#stream_ident;
2676                                    },
2677                                    None,
2678                                    Some(&next_stmt_id.to_string()),
2679                                );
2680                            }
2681                            BuildersOrCallback::Callback(_, node_callback) => {
2682                                node_callback(node, next_stmt_id);
2683                            }
2684                        }
2685
2686                        *next_stmt_id += 1;
2687
2688                        ident_stack.push(stream_ident);
2689                    }
2690
2691                    HydroNode::ResolveFutures { .. } => {
2692                        let input_ident = ident_stack.pop().unwrap();
2693
2694                        let futures_ident =
2695                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2696
2697                        match builders_or_callback {
2698                            BuildersOrCallback::Builders(graph_builders) => {
2699                                let builder = graph_builders.get_dfir_mut(&out_location);
2700                                builder.add_dfir(
2701                                    parse_quote! {
2702                                        #futures_ident = #input_ident -> resolve_futures();
2703                                    },
2704                                    None,
2705                                    Some(&next_stmt_id.to_string()),
2706                                );
2707                            }
2708                            BuildersOrCallback::Callback(_, node_callback) => {
2709                                node_callback(node, next_stmt_id);
2710                            }
2711                        }
2712
2713                        *next_stmt_id += 1;
2714
2715                        ident_stack.push(futures_ident);
2716                    }
2717
2718                    HydroNode::ResolveFuturesOrdered { .. } => {
2719                        let input_ident = ident_stack.pop().unwrap();
2720
2721                        let futures_ident =
2722                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2723
2724                        match builders_or_callback {
2725                            BuildersOrCallback::Builders(graph_builders) => {
2726                                let builder = graph_builders.get_dfir_mut(&out_location);
2727                                builder.add_dfir(
2728                                    parse_quote! {
2729                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2730                                    },
2731                                    None,
2732                                    Some(&next_stmt_id.to_string()),
2733                                );
2734                            }
2735                            BuildersOrCallback::Callback(_, node_callback) => {
2736                                node_callback(node, next_stmt_id);
2737                            }
2738                        }
2739
2740                        *next_stmt_id += 1;
2741
2742                        ident_stack.push(futures_ident);
2743                    }
2744
2745                    HydroNode::Map { f, .. } => {
2746                        let input_ident = ident_stack.pop().unwrap();
2747
2748                        let map_ident =
2749                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2750
2751                        match builders_or_callback {
2752                            BuildersOrCallback::Builders(graph_builders) => {
2753                                let builder = graph_builders.get_dfir_mut(&out_location);
2754                                builder.add_dfir(
2755                                    parse_quote! {
2756                                        #map_ident = #input_ident -> map(#f);
2757                                    },
2758                                    None,
2759                                    Some(&next_stmt_id.to_string()),
2760                                );
2761                            }
2762                            BuildersOrCallback::Callback(_, node_callback) => {
2763                                node_callback(node, next_stmt_id);
2764                            }
2765                        }
2766
2767                        *next_stmt_id += 1;
2768
2769                        ident_stack.push(map_ident);
2770                    }
2771
2772                    HydroNode::FlatMap { f, .. } => {
2773                        let input_ident = ident_stack.pop().unwrap();
2774
2775                        let flat_map_ident =
2776                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2777
2778                        match builders_or_callback {
2779                            BuildersOrCallback::Builders(graph_builders) => {
2780                                let builder = graph_builders.get_dfir_mut(&out_location);
2781                                builder.add_dfir(
2782                                    parse_quote! {
2783                                        #flat_map_ident = #input_ident -> flat_map(#f);
2784                                    },
2785                                    None,
2786                                    Some(&next_stmt_id.to_string()),
2787                                );
2788                            }
2789                            BuildersOrCallback::Callback(_, node_callback) => {
2790                                node_callback(node, next_stmt_id);
2791                            }
2792                        }
2793
2794                        *next_stmt_id += 1;
2795
2796                        ident_stack.push(flat_map_ident);
2797                    }
2798
2799                    HydroNode::Filter { f, .. } => {
2800                        let input_ident = ident_stack.pop().unwrap();
2801
2802                        let filter_ident =
2803                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2804
2805                        match builders_or_callback {
2806                            BuildersOrCallback::Builders(graph_builders) => {
2807                                let builder = graph_builders.get_dfir_mut(&out_location);
2808                                builder.add_dfir(
2809                                    parse_quote! {
2810                                        #filter_ident = #input_ident -> filter(#f);
2811                                    },
2812                                    None,
2813                                    Some(&next_stmt_id.to_string()),
2814                                );
2815                            }
2816                            BuildersOrCallback::Callback(_, node_callback) => {
2817                                node_callback(node, next_stmt_id);
2818                            }
2819                        }
2820
2821                        *next_stmt_id += 1;
2822
2823                        ident_stack.push(filter_ident);
2824                    }
2825
2826                    HydroNode::FilterMap { f, .. } => {
2827                        let input_ident = ident_stack.pop().unwrap();
2828
2829                        let filter_map_ident =
2830                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832                        match builders_or_callback {
2833                            BuildersOrCallback::Builders(graph_builders) => {
2834                                let builder = graph_builders.get_dfir_mut(&out_location);
2835                                builder.add_dfir(
2836                                    parse_quote! {
2837                                        #filter_map_ident = #input_ident -> filter_map(#f);
2838                                    },
2839                                    None,
2840                                    Some(&next_stmt_id.to_string()),
2841                                );
2842                            }
2843                            BuildersOrCallback::Callback(_, node_callback) => {
2844                                node_callback(node, next_stmt_id);
2845                            }
2846                        }
2847
2848                        *next_stmt_id += 1;
2849
2850                        ident_stack.push(filter_map_ident);
2851                    }
2852
2853                    HydroNode::Sort { .. } => {
2854                        let input_ident = ident_stack.pop().unwrap();
2855
2856                        let sort_ident =
2857                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2858
2859                        match builders_or_callback {
2860                            BuildersOrCallback::Builders(graph_builders) => {
2861                                let builder = graph_builders.get_dfir_mut(&out_location);
2862                                builder.add_dfir(
2863                                    parse_quote! {
2864                                        #sort_ident = #input_ident -> sort();
2865                                    },
2866                                    None,
2867                                    Some(&next_stmt_id.to_string()),
2868                                );
2869                            }
2870                            BuildersOrCallback::Callback(_, node_callback) => {
2871                                node_callback(node, next_stmt_id);
2872                            }
2873                        }
2874
2875                        *next_stmt_id += 1;
2876
2877                        ident_stack.push(sort_ident);
2878                    }
2879
2880                    HydroNode::DeferTick { .. } => {
2881                        let input_ident = ident_stack.pop().unwrap();
2882
2883                        let defer_tick_ident =
2884                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2885
2886                        match builders_or_callback {
2887                            BuildersOrCallback::Builders(graph_builders) => {
2888                                let builder = graph_builders.get_dfir_mut(&out_location);
2889                                builder.add_dfir(
2890                                    parse_quote! {
2891                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
2892                                    },
2893                                    None,
2894                                    Some(&next_stmt_id.to_string()),
2895                                );
2896                            }
2897                            BuildersOrCallback::Callback(_, node_callback) => {
2898                                node_callback(node, next_stmt_id);
2899                            }
2900                        }
2901
2902                        *next_stmt_id += 1;
2903
2904                        ident_stack.push(defer_tick_ident);
2905                    }
2906
2907                    HydroNode::Enumerate { input, .. } => {
2908                        let input_ident = ident_stack.pop().unwrap();
2909
2910                        let enumerate_ident =
2911                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2912
2913                        match builders_or_callback {
2914                            BuildersOrCallback::Builders(graph_builders) => {
2915                                let builder = graph_builders.get_dfir_mut(&out_location);
2916                                let lifetime = if input.metadata().location_kind.is_top_level() {
2917                                    quote!('static)
2918                                } else {
2919                                    quote!('tick)
2920                                };
2921                                builder.add_dfir(
2922                                    parse_quote! {
2923                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2924                                    },
2925                                    None,
2926                                    Some(&next_stmt_id.to_string()),
2927                                );
2928                            }
2929                            BuildersOrCallback::Callback(_, node_callback) => {
2930                                node_callback(node, next_stmt_id);
2931                            }
2932                        }
2933
2934                        *next_stmt_id += 1;
2935
2936                        ident_stack.push(enumerate_ident);
2937                    }
2938
2939                    HydroNode::Inspect { f, .. } => {
2940                        let input_ident = ident_stack.pop().unwrap();
2941
2942                        let inspect_ident =
2943                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2944
2945                        match builders_or_callback {
2946                            BuildersOrCallback::Builders(graph_builders) => {
2947                                let builder = graph_builders.get_dfir_mut(&out_location);
2948                                builder.add_dfir(
2949                                    parse_quote! {
2950                                        #inspect_ident = #input_ident -> inspect(#f);
2951                                    },
2952                                    None,
2953                                    Some(&next_stmt_id.to_string()),
2954                                );
2955                            }
2956                            BuildersOrCallback::Callback(_, node_callback) => {
2957                                node_callback(node, next_stmt_id);
2958                            }
2959                        }
2960
2961                        *next_stmt_id += 1;
2962
2963                        ident_stack.push(inspect_ident);
2964                    }
2965
2966                    HydroNode::Unique { input, .. } => {
2967                        let input_ident = ident_stack.pop().unwrap();
2968
2969                        let unique_ident =
2970                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2971
2972                        match builders_or_callback {
2973                            BuildersOrCallback::Builders(graph_builders) => {
2974                                let builder = graph_builders.get_dfir_mut(&out_location);
2975                                let lifetime = if input.metadata().location_kind.is_top_level() {
2976                                    quote!('static)
2977                                } else {
2978                                    quote!('tick)
2979                                };
2980
2981                                builder.add_dfir(
2982                                    parse_quote! {
2983                                        #unique_ident = #input_ident -> unique::<#lifetime>();
2984                                    },
2985                                    None,
2986                                    Some(&next_stmt_id.to_string()),
2987                                );
2988                            }
2989                            BuildersOrCallback::Callback(_, node_callback) => {
2990                                node_callback(node, next_stmt_id);
2991                            }
2992                        }
2993
2994                        *next_stmt_id += 1;
2995
2996                        ident_stack.push(unique_ident);
2997                    }
2998
2999                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3000                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3001                            if input.metadata().location_kind.is_top_level()
3002                                && input.metadata().collection_kind.is_bounded()
3003                            {
3004                                parse_quote!(fold_no_replay)
3005                            } else {
3006                                parse_quote!(fold)
3007                            }
3008                        } else if matches!(node, HydroNode::Scan { .. }) {
3009                            parse_quote!(scan)
3010                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3011                            if input.metadata().location_kind.is_top_level()
3012                                && input.metadata().collection_kind.is_bounded()
3013                            {
3014                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3015                            } else {
3016                                parse_quote!(fold_keyed)
3017                            }
3018                        } else {
3019                            unreachable!()
3020                        };
3021
3022                        let (HydroNode::Fold { input, .. }
3023                        | HydroNode::FoldKeyed { input, .. }
3024                        | HydroNode::Scan { input, .. }) = node
3025                        else {
3026                            unreachable!()
3027                        };
3028
3029                        let lifetime = if input.metadata().location_kind.is_top_level() {
3030                            quote!('static)
3031                        } else {
3032                            quote!('tick)
3033                        };
3034
3035                        let input_ident = ident_stack.pop().unwrap();
3036
3037                        let (HydroNode::Fold { init, acc, .. }
3038                        | HydroNode::FoldKeyed { init, acc, .. }
3039                        | HydroNode::Scan { init, acc, .. }) = &*node
3040                        else {
3041                            unreachable!()
3042                        };
3043
3044                        let fold_ident =
3045                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3046
3047                        match builders_or_callback {
3048                            BuildersOrCallback::Builders(graph_builders) => {
3049                                if matches!(node, HydroNode::Fold { .. })
3050                                    && node.metadata().location_kind.is_top_level()
3051                                    && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3052                                    && graph_builders.singleton_intermediates()
3053                                    && !node.metadata().collection_kind.is_bounded()
3054                                {
3055                                    let builder = graph_builders.get_dfir_mut(&out_location);
3056
3057                                    let acc: syn::Expr = parse_quote!({
3058                                        let mut __inner = #acc;
3059                                        move |__state, __value| {
3060                                            __inner(__state, __value);
3061                                            Some(__state.clone())
3062                                        }
3063                                    });
3064
3065                                    builder.add_dfir(
3066                                        parse_quote! {
3067                                            source_iter([(#init)()]) -> [0]#fold_ident;
3068                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3069                                            #fold_ident = chain();
3070                                        },
3071                                        None,
3072                                        Some(&next_stmt_id.to_string()),
3073                                    );
3074                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3075                                    && node.metadata().location_kind.is_top_level()
3076                                    && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3077                                    && graph_builders.singleton_intermediates()
3078                                    && !node.metadata().collection_kind.is_bounded()
3079                                {
3080                                    let builder = graph_builders.get_dfir_mut(&out_location);
3081
3082                                    let acc: syn::Expr = parse_quote!({
3083                                        let mut __init = #init;
3084                                        let mut __inner = #acc;
3085                                        move |__state, __kv: (_, _)| {
3086                                            // TODO(shadaj): we can avoid the clone when the entry exists
3087                                            let __state = __state
3088                                                .entry(::std::clone::Clone::clone(&__kv.0))
3089                                                .or_insert_with(|| (__init)());
3090                                            __inner(__state, __kv.1);
3091                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3092                                        }
3093                                    });
3094
3095                                    builder.add_dfir(
3096                                        parse_quote! {
3097                                            source_iter([(#init)()]) -> [0]#fold_ident;
3098                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3099                                        },
3100                                        None,
3101                                        Some(&next_stmt_id.to_string()),
3102                                    );
3103                                } else {
3104                                    let builder = graph_builders.get_dfir_mut(&out_location);
3105                                    builder.add_dfir(
3106                                        parse_quote! {
3107                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3108                                        },
3109                                        None,
3110                                        Some(&next_stmt_id.to_string()),
3111                                    );
3112                                }
3113                            }
3114                            BuildersOrCallback::Callback(_, node_callback) => {
3115                                node_callback(node, next_stmt_id);
3116                            }
3117                        }
3118
3119                        *next_stmt_id += 1;
3120
3121                        ident_stack.push(fold_ident);
3122                    }
3123
3124                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3125                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3126                            if input.metadata().location_kind.is_top_level()
3127                                && input.metadata().collection_kind.is_bounded()
3128                            {
3129                                parse_quote!(reduce_no_replay)
3130                            } else {
3131                                parse_quote!(reduce)
3132                            }
3133                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3134                            if input.metadata().location_kind.is_top_level()
3135                                && input.metadata().collection_kind.is_bounded()
3136                            {
3137                                todo!(
3138                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3139                                )
3140                            } else {
3141                                parse_quote!(reduce_keyed)
3142                            }
3143                        } else {
3144                            unreachable!()
3145                        };
3146
3147                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3148                        else {
3149                            unreachable!()
3150                        };
3151
3152                        let lifetime = if input.metadata().location_kind.is_top_level() {
3153                            quote!('static)
3154                        } else {
3155                            quote!('tick)
3156                        };
3157
3158                        let input_ident = ident_stack.pop().unwrap();
3159
3160                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3161                        else {
3162                            unreachable!()
3163                        };
3164
3165                        let reduce_ident =
3166                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3167
3168                        match builders_or_callback {
3169                            BuildersOrCallback::Builders(graph_builders) => {
3170                                if matches!(node, HydroNode::Reduce { .. })
3171                                    && node.metadata().location_kind.is_top_level()
3172                                    && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3173                                    && graph_builders.singleton_intermediates()
3174                                    && !node.metadata().collection_kind.is_bounded()
3175                                {
3176                                    todo!(
3177                                        "Reduce with optional intermediates is not yet supported in simulator"
3178                                    );
3179                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3180                                    && node.metadata().location_kind.is_top_level()
3181                                    && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3182                                    && graph_builders.singleton_intermediates()
3183                                    && !node.metadata().collection_kind.is_bounded()
3184                                {
3185                                    todo!(
3186                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3187                                    );
3188                                } else {
3189                                    let builder = graph_builders.get_dfir_mut(&out_location);
3190                                    builder.add_dfir(
3191                                        parse_quote! {
3192                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3193                                        },
3194                                        None,
3195                                        Some(&next_stmt_id.to_string()),
3196                                    );
3197                                }
3198                            }
3199                            BuildersOrCallback::Callback(_, node_callback) => {
3200                                node_callback(node, next_stmt_id);
3201                            }
3202                        }
3203
3204                        *next_stmt_id += 1;
3205
3206                        ident_stack.push(reduce_ident);
3207                    }
3208
3209                    HydroNode::ReduceKeyedWatermark {
3210                        f,
3211                        input,
3212                        metadata,
3213                        ..
3214                    } => {
3215                        let lifetime = if input.metadata().location_kind.is_top_level() {
3216                            quote!('static)
3217                        } else {
3218                            quote!('tick)
3219                        };
3220
3221                        // watermark is processed second, so it's on top
3222                        let watermark_ident = ident_stack.pop().unwrap();
3223                        let input_ident = ident_stack.pop().unwrap();
3224
3225                        let chain_ident = syn::Ident::new(
3226                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3227                            Span::call_site(),
3228                        );
3229
3230                        let fold_ident =
3231                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3232
3233                        let agg_operator: syn::Ident = if input.metadata().location_kind.is_top_level()
3234                            && input.metadata().collection_kind.is_bounded()
3235                        {
3236                            parse_quote!(fold_no_replay)
3237                        } else {
3238                            parse_quote!(fold)
3239                        };
3240
3241                        match builders_or_callback {
3242                            BuildersOrCallback::Builders(graph_builders) => {
3243                                if metadata.location_kind.is_top_level()
3244                                    && !(matches!(metadata.location_kind, LocationId::Atomic(_)))
3245                                    && graph_builders.singleton_intermediates()
3246                                    && !metadata.collection_kind.is_bounded()
3247                                {
3248                                    todo!(
3249                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3250                                    )
3251                                } else {
3252                                    let builder = graph_builders.get_dfir_mut(&out_location);
3253                                    builder.add_dfir(
3254                                        parse_quote! {
3255                                            #chain_ident = chain();
3256                                            #input_ident
3257                                                -> map(|x| (Some(x), None))
3258                                                -> [0]#chain_ident;
3259                                            #watermark_ident
3260                                                -> map(|watermark| (None, Some(watermark)))
3261                                                -> [1]#chain_ident;
3262
3263                                            #fold_ident = #chain_ident
3264                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3265                                                    let __reduce_keyed_fn = #f;
3266                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3267                                                        if let Some((k, v)) = opt_payload {
3268                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3269                                                                if k <= curr_watermark {
3270                                                                    return;
3271                                                                }
3272                                                            }
3273                                                            match map.entry(k) {
3274                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3275                                                                    e.insert(v);
3276                                                                }
3277                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3278                                                                    __reduce_keyed_fn(e.get_mut(), v);
3279                                                                }
3280                                                            }
3281                                                        } else {
3282                                                            let watermark = opt_watermark.unwrap();
3283                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3284                                                                if watermark <= curr_watermark {
3285                                                                    return;
3286                                                                }
3287                                                            }
3288                                                            *opt_curr_watermark = opt_watermark;
3289                                                            map.retain(|k, _| *k > watermark);
3290                                                        }
3291                                                    }
3292                                                })
3293                                                -> flat_map(|(map, _curr_watermark)| map);
3294                                        },
3295                                        None,
3296                                        Some(&next_stmt_id.to_string()),
3297                                    );
3298                                }
3299                            }
3300                            BuildersOrCallback::Callback(_, node_callback) => {
3301                                node_callback(node, next_stmt_id);
3302                            }
3303                        }
3304
3305                        *next_stmt_id += 1;
3306
3307                        ident_stack.push(fold_ident);
3308                    }
3309
3310                    HydroNode::Network {
3311                        serialize_fn: serialize_pipeline,
3312                        instantiate_fn,
3313                        deserialize_fn: deserialize_pipeline,
3314                        input,
3315                        ..
3316                    } => {
3317                        let input_ident = ident_stack.pop().unwrap();
3318
3319                        let receiver_stream_ident =
3320                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3321
3322                        match builders_or_callback {
3323                            BuildersOrCallback::Builders(graph_builders) => {
3324                                let (sink_expr, source_expr) = match instantiate_fn {
3325                                    DebugInstantiate::Building => (
3326                                        syn::parse_quote!(DUMMY_SINK),
3327                                        syn::parse_quote!(DUMMY_SOURCE),
3328                                    ),
3329
3330                                    DebugInstantiate::Finalized(finalized) => {
3331                                        (finalized.sink.clone(), finalized.source.clone())
3332                                    }
3333                                };
3334
3335                                graph_builders.create_network(
3336                                    &input.metadata().location_kind,
3337                                    &out_location,
3338                                    input_ident,
3339                                    &receiver_stream_ident,
3340                                    serialize_pipeline,
3341                                    sink_expr,
3342                                    source_expr,
3343                                    deserialize_pipeline,
3344                                    *next_stmt_id,
3345                                );
3346                            }
3347                            BuildersOrCallback::Callback(_, node_callback) => {
3348                                node_callback(node, next_stmt_id);
3349                            }
3350                        }
3351
3352                        *next_stmt_id += 1;
3353
3354                        ident_stack.push(receiver_stream_ident);
3355                    }
3356
3357                    HydroNode::ExternalInput {
3358                        instantiate_fn,
3359                        deserialize_fn: deserialize_pipeline,
3360                        ..
3361                    } => {
3362                        let receiver_stream_ident =
3363                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3364
3365                        match builders_or_callback {
3366                            BuildersOrCallback::Builders(graph_builders) => {
3367                                let (_, source_expr) = match instantiate_fn {
3368                                    DebugInstantiate::Building => (
3369                                        syn::parse_quote!(DUMMY_SINK),
3370                                        syn::parse_quote!(DUMMY_SOURCE),
3371                                    ),
3372
3373                                    DebugInstantiate::Finalized(finalized) => {
3374                                        (finalized.sink.clone(), finalized.source.clone())
3375                                    }
3376                                };
3377
3378                                graph_builders.create_external_source(
3379                                    &out_location,
3380                                    source_expr,
3381                                    &receiver_stream_ident,
3382                                    deserialize_pipeline,
3383                                    *next_stmt_id,
3384                                );
3385                            }
3386                            BuildersOrCallback::Callback(_, node_callback) => {
3387                                node_callback(node, next_stmt_id);
3388                            }
3389                        }
3390
3391                        *next_stmt_id += 1;
3392
3393                        ident_stack.push(receiver_stream_ident);
3394                    }
3395
3396                    HydroNode::Counter {
3397                        tag,
3398                        duration,
3399                        prefix,
3400                        ..
3401                    } => {
3402                        let input_ident = ident_stack.pop().unwrap();
3403
3404                        let counter_ident =
3405                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3406
3407                        match builders_or_callback {
3408                            BuildersOrCallback::Builders(graph_builders) => {
3409                                let builder = graph_builders.get_dfir_mut(&out_location);
3410                                builder.add_dfir(
3411                                    parse_quote! {
3412                                        #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3413                                    },
3414                                    None,
3415                                    Some(&next_stmt_id.to_string()),
3416                                );
3417                            }
3418                            BuildersOrCallback::Callback(_, node_callback) => {
3419                                node_callback(node, next_stmt_id);
3420                            }
3421                        }
3422
3423                        *next_stmt_id += 1;
3424
3425                        ident_stack.push(counter_ident);
3426                    }
3427                }
3428            },
3429            seen_tees,
3430            false,
3431        );
3432
3433        ident_stack
3434            .pop()
3435            .expect("ident_stack should have exactly one element after traversal")
3436    }
3437
3438    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3439        match self {
3440            HydroNode::Placeholder => {
3441                panic!()
3442            }
3443            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3444            HydroNode::Source { source, .. } => match source {
3445                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3446                HydroSource::ExternalNetwork()
3447                | HydroSource::Spin()
3448                | HydroSource::ClusterMembers(_) => {} // TODO: what goes here?
3449            },
3450            HydroNode::SingletonSource { value, .. } => {
3451                transform(value);
3452            }
3453            HydroNode::CycleSource { .. }
3454            | HydroNode::Tee { .. }
3455            | HydroNode::YieldConcat { .. }
3456            | HydroNode::BeginAtomic { .. }
3457            | HydroNode::EndAtomic { .. }
3458            | HydroNode::Batch { .. }
3459            | HydroNode::Chain { .. }
3460            | HydroNode::ChainFirst { .. }
3461            | HydroNode::CrossProduct { .. }
3462            | HydroNode::CrossSingleton { .. }
3463            | HydroNode::ResolveFutures { .. }
3464            | HydroNode::ResolveFuturesOrdered { .. }
3465            | HydroNode::Join { .. }
3466            | HydroNode::Difference { .. }
3467            | HydroNode::AntiJoin { .. }
3468            | HydroNode::DeferTick { .. }
3469            | HydroNode::Enumerate { .. }
3470            | HydroNode::Unique { .. }
3471            | HydroNode::Sort { .. } => {}
3472            HydroNode::Map { f, .. }
3473            | HydroNode::FlatMap { f, .. }
3474            | HydroNode::Filter { f, .. }
3475            | HydroNode::FilterMap { f, .. }
3476            | HydroNode::Inspect { f, .. }
3477            | HydroNode::Reduce { f, .. }
3478            | HydroNode::ReduceKeyed { f, .. }
3479            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3480                transform(f);
3481            }
3482            HydroNode::Fold { init, acc, .. }
3483            | HydroNode::Scan { init, acc, .. }
3484            | HydroNode::FoldKeyed { init, acc, .. } => {
3485                transform(init);
3486                transform(acc);
3487            }
3488            HydroNode::Network {
3489                serialize_fn,
3490                deserialize_fn,
3491                ..
3492            } => {
3493                if let Some(serialize_fn) = serialize_fn {
3494                    transform(serialize_fn);
3495                }
3496                if let Some(deserialize_fn) = deserialize_fn {
3497                    transform(deserialize_fn);
3498                }
3499            }
3500            HydroNode::ExternalInput { deserialize_fn, .. } => {
3501                if let Some(deserialize_fn) = deserialize_fn {
3502                    transform(deserialize_fn);
3503                }
3504            }
3505            HydroNode::Counter { duration, .. } => {
3506                transform(duration);
3507            }
3508        }
3509    }
3510
3511    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3512        &self.metadata().op
3513    }
3514
3515    pub fn metadata(&self) -> &HydroIrMetadata {
3516        match self {
3517            HydroNode::Placeholder => {
3518                panic!()
3519            }
3520            HydroNode::Cast { metadata, .. } => metadata,
3521            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3522            HydroNode::Source { metadata, .. } => metadata,
3523            HydroNode::SingletonSource { metadata, .. } => metadata,
3524            HydroNode::CycleSource { metadata, .. } => metadata,
3525            HydroNode::Tee { metadata, .. } => metadata,
3526            HydroNode::YieldConcat { metadata, .. } => metadata,
3527            HydroNode::BeginAtomic { metadata, .. } => metadata,
3528            HydroNode::EndAtomic { metadata, .. } => metadata,
3529            HydroNode::Batch { metadata, .. } => metadata,
3530            HydroNode::Chain { metadata, .. } => metadata,
3531            HydroNode::ChainFirst { metadata, .. } => metadata,
3532            HydroNode::CrossProduct { metadata, .. } => metadata,
3533            HydroNode::CrossSingleton { metadata, .. } => metadata,
3534            HydroNode::Join { metadata, .. } => metadata,
3535            HydroNode::Difference { metadata, .. } => metadata,
3536            HydroNode::AntiJoin { metadata, .. } => metadata,
3537            HydroNode::ResolveFutures { metadata, .. } => metadata,
3538            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3539            HydroNode::Map { metadata, .. } => metadata,
3540            HydroNode::FlatMap { metadata, .. } => metadata,
3541            HydroNode::Filter { metadata, .. } => metadata,
3542            HydroNode::FilterMap { metadata, .. } => metadata,
3543            HydroNode::DeferTick { metadata, .. } => metadata,
3544            HydroNode::Enumerate { metadata, .. } => metadata,
3545            HydroNode::Inspect { metadata, .. } => metadata,
3546            HydroNode::Unique { metadata, .. } => metadata,
3547            HydroNode::Sort { metadata, .. } => metadata,
3548            HydroNode::Scan { metadata, .. } => metadata,
3549            HydroNode::Fold { metadata, .. } => metadata,
3550            HydroNode::FoldKeyed { metadata, .. } => metadata,
3551            HydroNode::Reduce { metadata, .. } => metadata,
3552            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3553            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3554            HydroNode::ExternalInput { metadata, .. } => metadata,
3555            HydroNode::Network { metadata, .. } => metadata,
3556            HydroNode::Counter { metadata, .. } => metadata,
3557        }
3558    }
3559
3560    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3561        &mut self.metadata_mut().op
3562    }
3563
3564    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3565        match self {
3566            HydroNode::Placeholder => {
3567                panic!()
3568            }
3569            HydroNode::Cast { metadata, .. } => metadata,
3570            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3571            HydroNode::Source { metadata, .. } => metadata,
3572            HydroNode::SingletonSource { metadata, .. } => metadata,
3573            HydroNode::CycleSource { metadata, .. } => metadata,
3574            HydroNode::Tee { metadata, .. } => metadata,
3575            HydroNode::YieldConcat { metadata, .. } => metadata,
3576            HydroNode::BeginAtomic { metadata, .. } => metadata,
3577            HydroNode::EndAtomic { metadata, .. } => metadata,
3578            HydroNode::Batch { metadata, .. } => metadata,
3579            HydroNode::Chain { metadata, .. } => metadata,
3580            HydroNode::ChainFirst { metadata, .. } => metadata,
3581            HydroNode::CrossProduct { metadata, .. } => metadata,
3582            HydroNode::CrossSingleton { metadata, .. } => metadata,
3583            HydroNode::Join { metadata, .. } => metadata,
3584            HydroNode::Difference { metadata, .. } => metadata,
3585            HydroNode::AntiJoin { metadata, .. } => metadata,
3586            HydroNode::ResolveFutures { metadata, .. } => metadata,
3587            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3588            HydroNode::Map { metadata, .. } => metadata,
3589            HydroNode::FlatMap { metadata, .. } => metadata,
3590            HydroNode::Filter { metadata, .. } => metadata,
3591            HydroNode::FilterMap { metadata, .. } => metadata,
3592            HydroNode::DeferTick { metadata, .. } => metadata,
3593            HydroNode::Enumerate { metadata, .. } => metadata,
3594            HydroNode::Inspect { metadata, .. } => metadata,
3595            HydroNode::Unique { metadata, .. } => metadata,
3596            HydroNode::Sort { metadata, .. } => metadata,
3597            HydroNode::Scan { metadata, .. } => metadata,
3598            HydroNode::Fold { metadata, .. } => metadata,
3599            HydroNode::FoldKeyed { metadata, .. } => metadata,
3600            HydroNode::Reduce { metadata, .. } => metadata,
3601            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3602            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3603            HydroNode::ExternalInput { metadata, .. } => metadata,
3604            HydroNode::Network { metadata, .. } => metadata,
3605            HydroNode::Counter { metadata, .. } => metadata,
3606        }
3607    }
3608
3609    pub fn input(&self) -> Vec<&HydroNode> {
3610        match self {
3611            HydroNode::Placeholder => {
3612                panic!()
3613            }
3614            HydroNode::Source { .. }
3615            | HydroNode::SingletonSource { .. }
3616            | HydroNode::ExternalInput { .. }
3617            | HydroNode::CycleSource { .. }
3618            | HydroNode::Tee { .. } => {
3619                // Tee should find its input in separate special ways
3620                vec![]
3621            }
3622            HydroNode::Cast { inner, .. }
3623            | HydroNode::ObserveNonDet { inner, .. }
3624            | HydroNode::YieldConcat { inner, .. }
3625            | HydroNode::BeginAtomic { inner, .. }
3626            | HydroNode::EndAtomic { inner, .. }
3627            | HydroNode::Batch { inner, .. } => {
3628                vec![inner]
3629            }
3630            HydroNode::Chain { first, second, .. } => {
3631                vec![first, second]
3632            }
3633            HydroNode::ChainFirst { first, second, .. } => {
3634                vec![first, second]
3635            }
3636            HydroNode::CrossProduct { left, right, .. }
3637            | HydroNode::CrossSingleton { left, right, .. }
3638            | HydroNode::Join { left, right, .. } => {
3639                vec![left, right]
3640            }
3641            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3642                vec![pos, neg]
3643            }
3644            HydroNode::Map { input, .. }
3645            | HydroNode::FlatMap { input, .. }
3646            | HydroNode::Filter { input, .. }
3647            | HydroNode::FilterMap { input, .. }
3648            | HydroNode::Sort { input, .. }
3649            | HydroNode::DeferTick { input, .. }
3650            | HydroNode::Enumerate { input, .. }
3651            | HydroNode::Inspect { input, .. }
3652            | HydroNode::Unique { input, .. }
3653            | HydroNode::Network { input, .. }
3654            | HydroNode::Counter { input, .. }
3655            | HydroNode::ResolveFutures { input, .. }
3656            | HydroNode::ResolveFuturesOrdered { input, .. }
3657            | HydroNode::Fold { input, .. }
3658            | HydroNode::FoldKeyed { input, .. }
3659            | HydroNode::Reduce { input, .. }
3660            | HydroNode::ReduceKeyed { input, .. }
3661            | HydroNode::Scan { input, .. } => {
3662                vec![input]
3663            }
3664            HydroNode::ReduceKeyedWatermark {
3665                input, watermark, ..
3666            } => {
3667                vec![input, watermark]
3668            }
3669        }
3670    }
3671
3672    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3673        self.input()
3674            .iter()
3675            .map(|input_node| input_node.metadata())
3676            .collect()
3677    }
3678
3679    pub fn print_root(&self) -> String {
3680        match self {
3681            HydroNode::Placeholder => {
3682                panic!()
3683            }
3684            HydroNode::Cast { .. } => "Cast()".to_string(),
3685            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3686            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3687            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3688            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3689            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3690            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3691            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3692            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3693            HydroNode::Batch { .. } => "Batch()".to_string(),
3694            HydroNode::Chain { first, second, .. } => {
3695                format!("Chain({}, {})", first.print_root(), second.print_root())
3696            }
3697            HydroNode::ChainFirst { first, second, .. } => {
3698                format!(
3699                    "ChainFirst({}, {})",
3700                    first.print_root(),
3701                    second.print_root()
3702                )
3703            }
3704            HydroNode::CrossProduct { left, right, .. } => {
3705                format!(
3706                    "CrossProduct({}, {})",
3707                    left.print_root(),
3708                    right.print_root()
3709                )
3710            }
3711            HydroNode::CrossSingleton { left, right, .. } => {
3712                format!(
3713                    "CrossSingleton({}, {})",
3714                    left.print_root(),
3715                    right.print_root()
3716                )
3717            }
3718            HydroNode::Join { left, right, .. } => {
3719                format!("Join({}, {})", left.print_root(), right.print_root())
3720            }
3721            HydroNode::Difference { pos, neg, .. } => {
3722                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3723            }
3724            HydroNode::AntiJoin { pos, neg, .. } => {
3725                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3726            }
3727            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3728            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3729            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3730            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3731            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3732            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3733            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3734            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3735            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3736            HydroNode::Unique { .. } => "Unique()".to_string(),
3737            HydroNode::Sort { .. } => "Sort()".to_string(),
3738            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3739            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3740            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3741            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3742            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3743            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3744            HydroNode::Network { .. } => "Network()".to_string(),
3745            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3746            HydroNode::Counter { tag, duration, .. } => {
3747                format!("Counter({:?}, {:?})", tag, duration)
3748            }
3749        }
3750    }
3751}
3752
3753#[cfg(feature = "build")]
3754fn instantiate_network<'a, D>(
3755    from_location: &LocationId,
3756    to_location: &LocationId,
3757    processes: &HashMap<usize, D::Process>,
3758    clusters: &HashMap<usize, D::Cluster>,
3759) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3760where
3761    D: Deploy<'a>,
3762{
3763    let ((sink, source), connect_fn) = match (from_location, to_location) {
3764        (LocationId::Process(from), LocationId::Process(to)) => {
3765            let from_node = processes
3766                .get(from)
3767                .unwrap_or_else(|| {
3768                    panic!("A process used in the graph was not instantiated: {}", from)
3769                })
3770                .clone();
3771            let to_node = processes
3772                .get(to)
3773                .unwrap_or_else(|| {
3774                    panic!("A process used in the graph was not instantiated: {}", to)
3775                })
3776                .clone();
3777
3778            let sink_port = from_node.next_port();
3779            let source_port = to_node.next_port();
3780
3781            (
3782                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3783                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3784            )
3785        }
3786        (LocationId::Process(from), LocationId::Cluster(to)) => {
3787            let from_node = processes
3788                .get(from)
3789                .unwrap_or_else(|| {
3790                    panic!("A process used in the graph was not instantiated: {}", from)
3791                })
3792                .clone();
3793            let to_node = clusters
3794                .get(to)
3795                .unwrap_or_else(|| {
3796                    panic!("A cluster used in the graph was not instantiated: {}", to)
3797                })
3798                .clone();
3799
3800            let sink_port = from_node.next_port();
3801            let source_port = to_node.next_port();
3802
3803            (
3804                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3805                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3806            )
3807        }
3808        (LocationId::Cluster(from), LocationId::Process(to)) => {
3809            let from_node = clusters
3810                .get(from)
3811                .unwrap_or_else(|| {
3812                    panic!("A cluster used in the graph was not instantiated: {}", from)
3813                })
3814                .clone();
3815            let to_node = processes
3816                .get(to)
3817                .unwrap_or_else(|| {
3818                    panic!("A process used in the graph was not instantiated: {}", to)
3819                })
3820                .clone();
3821
3822            let sink_port = from_node.next_port();
3823            let source_port = to_node.next_port();
3824
3825            (
3826                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3827                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3828            )
3829        }
3830        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3831            let from_node = clusters
3832                .get(from)
3833                .unwrap_or_else(|| {
3834                    panic!("A cluster used in the graph was not instantiated: {}", from)
3835                })
3836                .clone();
3837            let to_node = clusters
3838                .get(to)
3839                .unwrap_or_else(|| {
3840                    panic!("A cluster used in the graph was not instantiated: {}", to)
3841                })
3842                .clone();
3843
3844            let sink_port = from_node.next_port();
3845            let source_port = to_node.next_port();
3846
3847            (
3848                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3849                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3850            )
3851        }
3852        (LocationId::Tick(_, _), _) => panic!(),
3853        (_, LocationId::Tick(_, _)) => panic!(),
3854        (LocationId::Atomic(_), _) => panic!(),
3855        (_, LocationId::Atomic(_)) => panic!(),
3856    };
3857    (sink, source, connect_fn)
3858}
3859
3860#[cfg(test)]
3861mod test {
3862    use std::mem::size_of;
3863
3864    use stageleft::{QuotedWithContext, q};
3865
3866    use super::*;
3867
3868    #[test]
3869    #[cfg_attr(
3870        not(feature = "build"),
3871        ignore = "expects inclusion of feature-gated fields"
3872    )]
3873    fn hydro_node_size() {
3874        assert_eq!(size_of::<HydroNode>(), 240);
3875    }
3876
3877    #[test]
3878    #[cfg_attr(
3879        not(feature = "build"),
3880        ignore = "expects inclusion of feature-gated fields"
3881    )]
3882    fn hydro_root_size() {
3883        assert_eq!(size_of::<HydroRoot>(), 136);
3884    }
3885
3886    #[test]
3887    fn test_simplify_q_macro_basic() {
3888        // Test basic non-q! expression
3889        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3890        let result = simplify_q_macro(simple_expr.clone());
3891        assert_eq!(result, simple_expr);
3892    }
3893
3894    #[test]
3895    fn test_simplify_q_macro_actual_stageleft_call() {
3896        // Test a simplified version of what a real stageleft call might look like
3897        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3898        let result = simplify_q_macro(stageleft_call);
3899        // This should be processed by our visitor and simplified to q!(...)
3900        // since we detect the stageleft::runtime_support::fn_* pattern
3901        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3902    }
3903
3904    #[test]
3905    fn test_closure_no_pipe_at_start() {
3906        // Test a closure that does not start with a pipe
3907        let stageleft_call = q!({
3908            let foo = 123;
3909            move |b: usize| b + foo
3910        })
3911        .splice_fn1_ctx(&());
3912        let result = simplify_q_macro(stageleft_call);
3913        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3914    }
3915}