hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307}
308
309#[cfg(feature = "build")]
310/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
311/// and simulations.
312///
313/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
314/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
315pub trait DfirBuilder {
316    /// Whether the representation of singletons should include intermediate states.
317    fn singleton_intermediates(&self) -> bool;
318
319    /// Gets the DFIR builder for the given location, creating it if necessary.
320    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
321
322    fn batch(
323        &mut self,
324        in_ident: syn::Ident,
325        in_location: &LocationId,
326        in_kind: &CollectionKind,
327        out_ident: &syn::Ident,
328        out_location: &LocationId,
329        op_meta: &HydroIrOpMetadata,
330    );
331    fn yield_from_tick(
332        &mut self,
333        in_ident: syn::Ident,
334        in_location: &LocationId,
335        in_kind: &CollectionKind,
336        out_ident: &syn::Ident,
337        out_location: &LocationId,
338    );
339
340    fn begin_atomic(
341        &mut self,
342        in_ident: syn::Ident,
343        in_location: &LocationId,
344        in_kind: &CollectionKind,
345        out_ident: &syn::Ident,
346        out_location: &LocationId,
347        op_meta: &HydroIrOpMetadata,
348    );
349    fn end_atomic(
350        &mut self,
351        in_ident: syn::Ident,
352        in_location: &LocationId,
353        in_kind: &CollectionKind,
354        out_ident: &syn::Ident,
355    );
356
357    fn observe_nondet(
358        &mut self,
359        trusted: bool,
360        location: &LocationId,
361        in_ident: syn::Ident,
362        in_kind: &CollectionKind,
363        out_ident: &syn::Ident,
364        out_kind: &CollectionKind,
365    );
366
367    #[expect(clippy::too_many_arguments, reason = "TODO")]
368    fn create_network(
369        &mut self,
370        from: &LocationId,
371        to: &LocationId,
372        input_ident: syn::Ident,
373        out_ident: &syn::Ident,
374        serialize: &Option<DebugExpr>,
375        sink: syn::Expr,
376        source: syn::Expr,
377        deserialize: &Option<DebugExpr>,
378        tag_id: usize,
379    );
380
381    fn create_external_source(
382        &mut self,
383        on: &LocationId,
384        source_expr: syn::Expr,
385        out_ident: &syn::Ident,
386        deserialize: &Option<DebugExpr>,
387        tag_id: usize,
388    );
389
390    fn create_external_output(
391        &mut self,
392        on: &LocationId,
393        sink_expr: syn::Expr,
394        input_ident: &syn::Ident,
395        serialize: &Option<DebugExpr>,
396        tag_id: usize,
397    );
398}
399
400#[cfg(feature = "build")]
401impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
402    fn singleton_intermediates(&self) -> bool {
403        false
404    }
405
406    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
407        self.entry(location.root().raw_id()).or_default()
408    }
409
410    fn batch(
411        &mut self,
412        in_ident: syn::Ident,
413        in_location: &LocationId,
414        _in_kind: &CollectionKind,
415        out_ident: &syn::Ident,
416        _out_location: &LocationId,
417        _op_meta: &HydroIrOpMetadata,
418    ) {
419        let builder = self.get_dfir_mut(in_location.root());
420        builder.add_dfir(
421            parse_quote! {
422                #out_ident = #in_ident;
423            },
424            None,
425            None,
426        );
427    }
428
429    fn yield_from_tick(
430        &mut self,
431        in_ident: syn::Ident,
432        in_location: &LocationId,
433        _in_kind: &CollectionKind,
434        out_ident: &syn::Ident,
435        _out_location: &LocationId,
436    ) {
437        let builder = self.get_dfir_mut(in_location.root());
438        builder.add_dfir(
439            parse_quote! {
440                #out_ident = #in_ident;
441            },
442            None,
443            None,
444        );
445    }
446
447    fn begin_atomic(
448        &mut self,
449        in_ident: syn::Ident,
450        in_location: &LocationId,
451        _in_kind: &CollectionKind,
452        out_ident: &syn::Ident,
453        _out_location: &LocationId,
454        _op_meta: &HydroIrOpMetadata,
455    ) {
456        let builder = self.get_dfir_mut(in_location.root());
457        builder.add_dfir(
458            parse_quote! {
459                #out_ident = #in_ident;
460            },
461            None,
462            None,
463        );
464    }
465
466    fn end_atomic(
467        &mut self,
468        in_ident: syn::Ident,
469        in_location: &LocationId,
470        _in_kind: &CollectionKind,
471        out_ident: &syn::Ident,
472    ) {
473        let builder = self.get_dfir_mut(in_location.root());
474        builder.add_dfir(
475            parse_quote! {
476                #out_ident = #in_ident;
477            },
478            None,
479            None,
480        );
481    }
482
483    fn observe_nondet(
484        &mut self,
485        _trusted: bool,
486        location: &LocationId,
487        in_ident: syn::Ident,
488        _in_kind: &CollectionKind,
489        out_ident: &syn::Ident,
490        _out_kind: &CollectionKind,
491    ) {
492        let builder = self.get_dfir_mut(location);
493        builder.add_dfir(
494            parse_quote! {
495                #out_ident = #in_ident;
496            },
497            None,
498            None,
499        );
500    }
501
502    fn create_network(
503        &mut self,
504        from: &LocationId,
505        to: &LocationId,
506        input_ident: syn::Ident,
507        out_ident: &syn::Ident,
508        serialize: &Option<DebugExpr>,
509        sink: syn::Expr,
510        source: syn::Expr,
511        deserialize: &Option<DebugExpr>,
512        tag_id: usize,
513    ) {
514        let sender_builder = self.get_dfir_mut(from);
515        if let Some(serialize_pipeline) = serialize {
516            sender_builder.add_dfir(
517                parse_quote! {
518                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
519                },
520                None,
521                // operator tag separates send and receive, which otherwise have the same next_stmt_id
522                Some(&format!("send{}", tag_id)),
523            );
524        } else {
525            sender_builder.add_dfir(
526                parse_quote! {
527                    #input_ident -> dest_sink(#sink);
528                },
529                None,
530                Some(&format!("send{}", tag_id)),
531            );
532        }
533
534        let receiver_builder = self.get_dfir_mut(to);
535        if let Some(deserialize_pipeline) = deserialize {
536            receiver_builder.add_dfir(
537                parse_quote! {
538                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
539                },
540                None,
541                Some(&format!("recv{}", tag_id)),
542            );
543        } else {
544            receiver_builder.add_dfir(
545                parse_quote! {
546                    #out_ident = source_stream(#source);
547                },
548                None,
549                Some(&format!("recv{}", tag_id)),
550            );
551        }
552    }
553
554    fn create_external_source(
555        &mut self,
556        on: &LocationId,
557        source_expr: syn::Expr,
558        out_ident: &syn::Ident,
559        deserialize: &Option<DebugExpr>,
560        tag_id: usize,
561    ) {
562        let receiver_builder = self.get_dfir_mut(on);
563        if let Some(deserialize_pipeline) = deserialize {
564            receiver_builder.add_dfir(
565                parse_quote! {
566                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
567                },
568                None,
569                Some(&format!("recv{}", tag_id)),
570            );
571        } else {
572            receiver_builder.add_dfir(
573                parse_quote! {
574                    #out_ident = source_stream(#source_expr);
575                },
576                None,
577                Some(&format!("recv{}", tag_id)),
578            );
579        }
580    }
581
582    fn create_external_output(
583        &mut self,
584        on: &LocationId,
585        sink_expr: syn::Expr,
586        input_ident: &syn::Ident,
587        serialize: &Option<DebugExpr>,
588        tag_id: usize,
589    ) {
590        let sender_builder = self.get_dfir_mut(on);
591        if let Some(serialize_fn) = serialize {
592            sender_builder.add_dfir(
593                parse_quote! {
594                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
595                },
596                None,
597                // operator tag separates send and receive, which otherwise have the same next_stmt_id
598                Some(&format!("send{}", tag_id)),
599            );
600        } else {
601            sender_builder.add_dfir(
602                parse_quote! {
603                    #input_ident -> dest_sink(#sink_expr);
604                },
605                None,
606                Some(&format!("send{}", tag_id)),
607            );
608        }
609    }
610}
611
612#[cfg(feature = "build")]
613pub enum BuildersOrCallback<'a, L, N>
614where
615    L: FnMut(&mut HydroRoot, &mut usize),
616    N: FnMut(&mut HydroNode, &mut usize),
617{
618    Builders(&'a mut dyn DfirBuilder),
619    Callback(L, N),
620}
621
622/// An root in a Hydro graph, which is an pipeline that doesn't emit
623/// any downstream values. Traversals over the dataflow graph and
624/// generating DFIR IR start from roots.
625#[derive(Debug, Hash)]
626pub enum HydroRoot {
627    ForEach {
628        f: DebugExpr,
629        input: Box<HydroNode>,
630        op_metadata: HydroIrOpMetadata,
631    },
632    SendExternal {
633        to_external_id: usize,
634        to_key: usize,
635        to_many: bool,
636        unpaired: bool,
637        serialize_fn: Option<DebugExpr>,
638        instantiate_fn: DebugInstantiate,
639        input: Box<HydroNode>,
640        op_metadata: HydroIrOpMetadata,
641    },
642    DestSink {
643        sink: DebugExpr,
644        input: Box<HydroNode>,
645        op_metadata: HydroIrOpMetadata,
646    },
647    CycleSink {
648        ident: syn::Ident,
649        input: Box<HydroNode>,
650        op_metadata: HydroIrOpMetadata,
651    },
652}
653
654impl HydroRoot {
655    #[cfg(feature = "build")]
656    pub fn compile_network<'a, D>(
657        &mut self,
658        compile_env: &D::CompileEnv,
659        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
660        seen_tees: &mut SeenTees,
661        processes: &HashMap<usize, D::Process>,
662        clusters: &HashMap<usize, D::Cluster>,
663        externals: &HashMap<usize, D::External>,
664    ) where
665        D: Deploy<'a>,
666    {
667        let refcell_extra_stmts = RefCell::new(extra_stmts);
668        self.transform_bottom_up(
669            &mut |l| {
670                if let HydroRoot::SendExternal {
671                    input,
672                    to_external_id,
673                    to_key,
674                    to_many,
675                    unpaired,
676                    instantiate_fn,
677                    ..
678                } = l
679                {
680                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
681                        DebugInstantiate::Building => {
682                            let to_node = externals
683                                .get(to_external_id)
684                                .unwrap_or_else(|| {
685                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
686                                })
687                                .clone();
688
689                            match input.metadata().location_kind.root() {
690                                LocationId::Process(process_id) => {
691                                    if *to_many {
692                                        (
693                                            (
694                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
695                                                parse_quote!(DUMMY),
696                                            ),
697                                            Box::new(|| {}) as Box<dyn FnOnce()>,
698                                        )
699                                    } else {
700                                        let from_node = processes
701                                            .get(process_id)
702                                            .unwrap_or_else(|| {
703                                                panic!("A process used in the graph was not instantiated: {}", process_id)
704                                            })
705                                            .clone();
706
707                                        let sink_port = D::allocate_process_port(&from_node);
708                                        let source_port = D::allocate_external_port(&to_node);
709
710                                        if *unpaired {
711                                            use stageleft::quote_type;
712                                            use tokio_util::codec::LengthDelimitedCodec;
713
714                                            to_node.register(*to_key, source_port.clone());
715
716                                            let _ = D::e2o_source(
717                                                compile_env,
718                                                refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
719                                                &to_node, &source_port,
720                                                &from_node, &sink_port,
721                                                &quote_type::<LengthDelimitedCodec>(),
722                                                format!("{}_{}", *to_external_id, *to_key)
723                                            );
724                                        }
725
726                                        (
727                                            (
728                                                D::o2e_sink(
729                                                    compile_env,
730                                                    &from_node,
731                                                    &sink_port,
732                                                    &to_node,
733                                                    &source_port,
734                                                    format!("{}_{}", *to_external_id, *to_key)
735                                                ),
736                                                parse_quote!(DUMMY),
737                                            ),
738                                            if *unpaired {
739                                                D::e2o_connect(
740                                                    &to_node,
741                                                    &source_port,
742                                                    &from_node,
743                                                    &sink_port,
744                                                    *to_many,
745                                                    NetworkHint::Auto,
746                                                )
747                                            } else {
748                                                Box::new(|| {}) as Box<dyn FnOnce()>
749                                            },
750                                        )
751                                    }
752                                }
753                                LocationId::Cluster(_) => todo!(),
754                                _ => panic!()
755                            }
756                        },
757
758                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
759                    };
760
761                    *instantiate_fn = DebugInstantiateFinalized {
762                        sink: sink_expr,
763                        source: source_expr,
764                        connect_fn: Some(connect_fn),
765                    }
766                    .into();
767                }
768            },
769            &mut |n| {
770                if let HydroNode::Network {
771                    input,
772                    instantiate_fn,
773                    metadata,
774                    ..
775                } = n
776                {
777                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
778                        DebugInstantiate::Building => instantiate_network::<D>(
779                            input.metadata().location_kind.root(),
780                            metadata.location_kind.root(),
781                            processes,
782                            clusters,
783                            compile_env,
784                        ),
785
786                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787                    };
788
789                    *instantiate_fn = DebugInstantiateFinalized {
790                        sink: sink_expr,
791                        source: source_expr,
792                        connect_fn: Some(connect_fn),
793                    }
794                    .into();
795                } else if let HydroNode::ExternalInput {
796                    from_external_id,
797                    from_key,
798                    from_many,
799                    codec_type,
800                    port_hint,
801                    instantiate_fn,
802                    metadata,
803                    ..
804                } = n
805                {
806                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807                        DebugInstantiate::Building => {
808                            let from_node = externals
809                                .get(from_external_id)
810                                .unwrap_or_else(|| {
811                                    panic!(
812                                        "A external used in the graph was not instantiated: {}",
813                                        from_external_id
814                                    )
815                                })
816                                .clone();
817
818                            match metadata.location_kind.root() {
819                                LocationId::Process(process_id) => {
820                                    let to_node = processes
821                                        .get(process_id)
822                                        .unwrap_or_else(|| {
823                                            panic!("A process used in the graph was not instantiated: {}", process_id)
824                                        })
825                                        .clone();
826
827                                    let sink_port = D::allocate_external_port(&from_node);
828                                    let source_port = D::allocate_process_port(&to_node);
829
830                                    from_node.register(*from_key, sink_port.clone());
831
832                                    (
833                                        (
834                                            parse_quote!(DUMMY),
835                                            if *from_many {
836                                                D::e2o_many_source(
837                                                    compile_env,
838                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
839                                                    &to_node, &source_port,
840                                                    codec_type.0.as_ref(),
841                                                    format!("{}_{}", *from_external_id, *from_key)
842                                                )
843                                            } else {
844                                                D::e2o_source(
845                                                    compile_env,
846                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
847                                                    &from_node, &sink_port,
848                                                    &to_node, &source_port,
849                                                    codec_type.0.as_ref(),
850                                                    format!("{}_{}", *from_external_id, *from_key)
851                                                )
852                                            },
853                                        ),
854                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
855                                    )
856                                }
857                                LocationId::Cluster(_) => todo!(),
858                                _ => panic!()
859                            }
860                        },
861
862                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
863                    };
864
865                    *instantiate_fn = DebugInstantiateFinalized {
866                        sink: sink_expr,
867                        source: source_expr,
868                        connect_fn: Some(connect_fn),
869                    }
870                    .into();
871                }
872            },
873            seen_tees,
874            false,
875        );
876    }
877
878    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
879        self.transform_bottom_up(
880            &mut |l| {
881                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
882                    match instantiate_fn {
883                        DebugInstantiate::Building => panic!("network not built"),
884
885                        DebugInstantiate::Finalized(finalized) => {
886                            (finalized.connect_fn.take().unwrap())();
887                        }
888                    }
889                }
890            },
891            &mut |n| {
892                if let HydroNode::Network { instantiate_fn, .. }
893                | HydroNode::ExternalInput { instantiate_fn, .. } = n
894                {
895                    match instantiate_fn {
896                        DebugInstantiate::Building => panic!("network not built"),
897
898                        DebugInstantiate::Finalized(finalized) => {
899                            (finalized.connect_fn.take().unwrap())();
900                        }
901                    }
902                }
903            },
904            seen_tees,
905            false,
906        );
907    }
908
909    pub fn transform_bottom_up(
910        &mut self,
911        transform_root: &mut impl FnMut(&mut HydroRoot),
912        transform_node: &mut impl FnMut(&mut HydroNode),
913        seen_tees: &mut SeenTees,
914        check_well_formed: bool,
915    ) {
916        self.transform_children(
917            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
918            seen_tees,
919        );
920
921        transform_root(self);
922    }
923
924    pub fn transform_children(
925        &mut self,
926        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
927        seen_tees: &mut SeenTees,
928    ) {
929        match self {
930            HydroRoot::ForEach { input, .. }
931            | HydroRoot::SendExternal { input, .. }
932            | HydroRoot::DestSink { input, .. }
933            | HydroRoot::CycleSink { input, .. } => {
934                transform(input, seen_tees);
935            }
936        }
937    }
938
939    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
940        match self {
941            HydroRoot::ForEach {
942                f,
943                input,
944                op_metadata,
945            } => HydroRoot::ForEach {
946                f: f.clone(),
947                input: Box::new(input.deep_clone(seen_tees)),
948                op_metadata: op_metadata.clone(),
949            },
950            HydroRoot::SendExternal {
951                to_external_id,
952                to_key,
953                to_many,
954                unpaired,
955                serialize_fn,
956                instantiate_fn,
957                input,
958                op_metadata,
959            } => HydroRoot::SendExternal {
960                to_external_id: *to_external_id,
961                to_key: *to_key,
962                to_many: *to_many,
963                unpaired: *unpaired,
964                serialize_fn: serialize_fn.clone(),
965                instantiate_fn: instantiate_fn.clone(),
966                input: Box::new(input.deep_clone(seen_tees)),
967                op_metadata: op_metadata.clone(),
968            },
969            HydroRoot::DestSink {
970                sink,
971                input,
972                op_metadata,
973            } => HydroRoot::DestSink {
974                sink: sink.clone(),
975                input: Box::new(input.deep_clone(seen_tees)),
976                op_metadata: op_metadata.clone(),
977            },
978            HydroRoot::CycleSink {
979                ident,
980                input,
981                op_metadata,
982            } => HydroRoot::CycleSink {
983                ident: ident.clone(),
984                input: Box::new(input.deep_clone(seen_tees)),
985                op_metadata: op_metadata.clone(),
986            },
987        }
988    }
989
990    #[cfg(feature = "build")]
991    pub fn emit(
992        &mut self,
993        graph_builders: &mut dyn DfirBuilder,
994        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
995        next_stmt_id: &mut usize,
996    ) {
997        self.emit_core(
998            &mut BuildersOrCallback::Builders::<
999                fn(&mut HydroRoot, &mut usize),
1000                fn(&mut HydroNode, &mut usize),
1001            >(graph_builders),
1002            built_tees,
1003            next_stmt_id,
1004        );
1005    }
1006
1007    #[cfg(feature = "build")]
1008    pub fn emit_core(
1009        &mut self,
1010        builders_or_callback: &mut BuildersOrCallback<
1011            impl FnMut(&mut HydroRoot, &mut usize),
1012            impl FnMut(&mut HydroNode, &mut usize),
1013        >,
1014        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1015        next_stmt_id: &mut usize,
1016    ) {
1017        match self {
1018            HydroRoot::ForEach { f, input, .. } => {
1019                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1020
1021                match builders_or_callback {
1022                    BuildersOrCallback::Builders(graph_builders) => {
1023                        graph_builders
1024                            .get_dfir_mut(&input.metadata().location_kind)
1025                            .add_dfir(
1026                                parse_quote! {
1027                                    #input_ident -> for_each(#f);
1028                                },
1029                                None,
1030                                Some(&next_stmt_id.to_string()),
1031                            );
1032                    }
1033                    BuildersOrCallback::Callback(leaf_callback, _) => {
1034                        leaf_callback(self, next_stmt_id);
1035                    }
1036                }
1037
1038                *next_stmt_id += 1;
1039            }
1040
1041            HydroRoot::SendExternal {
1042                serialize_fn,
1043                instantiate_fn,
1044                input,
1045                ..
1046            } => {
1047                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1048
1049                match builders_or_callback {
1050                    BuildersOrCallback::Builders(graph_builders) => {
1051                        let (sink_expr, _) = match instantiate_fn {
1052                            DebugInstantiate::Building => (
1053                                syn::parse_quote!(DUMMY_SINK),
1054                                syn::parse_quote!(DUMMY_SOURCE),
1055                            ),
1056
1057                            DebugInstantiate::Finalized(finalized) => {
1058                                (finalized.sink.clone(), finalized.source.clone())
1059                            }
1060                        };
1061
1062                        graph_builders.create_external_output(
1063                            &input.metadata().location_kind,
1064                            sink_expr,
1065                            &input_ident,
1066                            serialize_fn,
1067                            *next_stmt_id,
1068                        );
1069                    }
1070                    BuildersOrCallback::Callback(leaf_callback, _) => {
1071                        leaf_callback(self, next_stmt_id);
1072                    }
1073                }
1074
1075                *next_stmt_id += 1;
1076            }
1077
1078            HydroRoot::DestSink { sink, input, .. } => {
1079                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1080
1081                match builders_or_callback {
1082                    BuildersOrCallback::Builders(graph_builders) => {
1083                        graph_builders
1084                            .get_dfir_mut(&input.metadata().location_kind)
1085                            .add_dfir(
1086                                parse_quote! {
1087                                    #input_ident -> dest_sink(#sink);
1088                                },
1089                                None,
1090                                Some(&next_stmt_id.to_string()),
1091                            );
1092                    }
1093                    BuildersOrCallback::Callback(leaf_callback, _) => {
1094                        leaf_callback(self, next_stmt_id);
1095                    }
1096                }
1097
1098                *next_stmt_id += 1;
1099            }
1100
1101            HydroRoot::CycleSink { ident, input, .. } => {
1102                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1103
1104                match builders_or_callback {
1105                    BuildersOrCallback::Builders(graph_builders) => {
1106                        graph_builders
1107                            .get_dfir_mut(&input.metadata().location_kind)
1108                            .add_dfir(
1109                                parse_quote! {
1110                                    #ident = #input_ident;
1111                                },
1112                                None,
1113                                None,
1114                            );
1115                    }
1116                    // No ID, no callback
1117                    BuildersOrCallback::Callback(_, _) => {}
1118                }
1119            }
1120        }
1121    }
1122
1123    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1124        match self {
1125            HydroRoot::ForEach { op_metadata, .. }
1126            | HydroRoot::SendExternal { op_metadata, .. }
1127            | HydroRoot::DestSink { op_metadata, .. }
1128            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1129        }
1130    }
1131
1132    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1133        match self {
1134            HydroRoot::ForEach { op_metadata, .. }
1135            | HydroRoot::SendExternal { op_metadata, .. }
1136            | HydroRoot::DestSink { op_metadata, .. }
1137            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1138        }
1139    }
1140
1141    pub fn input(&self) -> &HydroNode {
1142        match self {
1143            HydroRoot::ForEach { input, .. }
1144            | HydroRoot::SendExternal { input, .. }
1145            | HydroRoot::DestSink { input, .. }
1146            | HydroRoot::CycleSink { input, .. } => input,
1147        }
1148    }
1149
1150    pub fn input_metadata(&self) -> &HydroIrMetadata {
1151        self.input().metadata()
1152    }
1153
1154    pub fn print_root(&self) -> String {
1155        match self {
1156            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1157            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1158            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1159            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1160        }
1161    }
1162
1163    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1164        match self {
1165            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1166                transform(f);
1167            }
1168            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1169        }
1170    }
1171}
1172
1173#[cfg(feature = "build")]
1174pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1175    let mut builders = BTreeMap::new();
1176    let mut built_tees = HashMap::new();
1177    let mut next_stmt_id = 0;
1178    for leaf in ir {
1179        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1180    }
1181    builders
1182}
1183
1184#[cfg(feature = "build")]
1185pub fn traverse_dfir(
1186    ir: &mut [HydroRoot],
1187    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1188    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1189) {
1190    let mut seen_tees = HashMap::new();
1191    let mut next_stmt_id = 0;
1192    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1193    ir.iter_mut().for_each(|leaf| {
1194        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1195    });
1196}
1197
1198pub fn transform_bottom_up(
1199    ir: &mut [HydroRoot],
1200    transform_root: &mut impl FnMut(&mut HydroRoot),
1201    transform_node: &mut impl FnMut(&mut HydroNode),
1202    check_well_formed: bool,
1203) {
1204    let mut seen_tees = HashMap::new();
1205    ir.iter_mut().for_each(|leaf| {
1206        leaf.transform_bottom_up(
1207            transform_root,
1208            transform_node,
1209            &mut seen_tees,
1210            check_well_formed,
1211        );
1212    });
1213}
1214
1215pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1216    let mut seen_tees = HashMap::new();
1217    ir.iter()
1218        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1219        .collect()
1220}
1221
1222type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1223thread_local! {
1224    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1225}
1226
1227pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1228    PRINTED_TEES.with(|printed_tees| {
1229        let mut printed_tees_mut = printed_tees.borrow_mut();
1230        *printed_tees_mut = Some((0, HashMap::new()));
1231        drop(printed_tees_mut);
1232
1233        let ret = f();
1234
1235        let mut printed_tees_mut = printed_tees.borrow_mut();
1236        *printed_tees_mut = None;
1237
1238        ret
1239    })
1240}
1241
1242pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1243
1244impl TeeNode {
1245    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1246        Rc::as_ptr(&self.0)
1247    }
1248}
1249
1250impl Debug for TeeNode {
1251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1252        PRINTED_TEES.with(|printed_tees| {
1253            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1254            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1255
1256            if let Some(printed_tees_mut) = printed_tees_mut {
1257                if let Some(existing) = printed_tees_mut
1258                    .1
1259                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1260                {
1261                    write!(f, "<tee {}>", existing)
1262                } else {
1263                    let next_id = printed_tees_mut.0;
1264                    printed_tees_mut.0 += 1;
1265                    printed_tees_mut
1266                        .1
1267                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1268                    drop(printed_tees_mut_borrow);
1269                    write!(f, "<tee {}>: ", next_id)?;
1270                    Debug::fmt(&self.0.borrow(), f)
1271                }
1272            } else {
1273                drop(printed_tees_mut_borrow);
1274                write!(f, "<tee>: ")?;
1275                Debug::fmt(&self.0.borrow(), f)
1276            }
1277        })
1278    }
1279}
1280
1281impl Hash for TeeNode {
1282    fn hash<H: Hasher>(&self, state: &mut H) {
1283        self.0.borrow_mut().hash(state);
1284    }
1285}
1286
1287#[derive(Clone, PartialEq, Eq, Debug)]
1288pub enum BoundKind {
1289    Unbounded,
1290    Bounded,
1291}
1292
1293#[derive(Clone, PartialEq, Eq, Debug)]
1294pub enum StreamOrder {
1295    NoOrder,
1296    TotalOrder,
1297}
1298
1299#[derive(Clone, PartialEq, Eq, Debug)]
1300pub enum StreamRetry {
1301    AtLeastOnce,
1302    ExactlyOnce,
1303}
1304
1305#[derive(Clone, PartialEq, Eq, Debug)]
1306pub enum KeyedSingletonBoundKind {
1307    Unbounded,
1308    BoundedValue,
1309    Bounded,
1310}
1311
1312#[derive(Clone, PartialEq, Eq, Debug)]
1313pub enum CollectionKind {
1314    Stream {
1315        bound: BoundKind,
1316        order: StreamOrder,
1317        retry: StreamRetry,
1318        element_type: DebugType,
1319    },
1320    Singleton {
1321        bound: BoundKind,
1322        element_type: DebugType,
1323    },
1324    Optional {
1325        bound: BoundKind,
1326        element_type: DebugType,
1327    },
1328    KeyedStream {
1329        bound: BoundKind,
1330        value_order: StreamOrder,
1331        value_retry: StreamRetry,
1332        key_type: DebugType,
1333        value_type: DebugType,
1334    },
1335    KeyedSingleton {
1336        bound: KeyedSingletonBoundKind,
1337        key_type: DebugType,
1338        value_type: DebugType,
1339    },
1340}
1341
1342#[derive(Clone)]
1343pub struct HydroIrMetadata {
1344    pub location_kind: LocationId,
1345    pub collection_kind: CollectionKind,
1346    pub cardinality: Option<usize>,
1347    pub tag: Option<String>,
1348    pub op: HydroIrOpMetadata,
1349}
1350
1351// HydroIrMetadata shouldn't be used to hash or compare
1352impl Hash for HydroIrMetadata {
1353    fn hash<H: Hasher>(&self, _: &mut H) {}
1354}
1355
1356impl PartialEq for HydroIrMetadata {
1357    fn eq(&self, _: &Self) -> bool {
1358        true
1359    }
1360}
1361
1362impl Eq for HydroIrMetadata {}
1363
1364impl Debug for HydroIrMetadata {
1365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366        f.debug_struct("HydroIrMetadata")
1367            .field("location_kind", &self.location_kind)
1368            .field("collection_kind", &self.collection_kind)
1369            .finish()
1370    }
1371}
1372
1373/// Metadata that is specific to the operator itself, rather than its outputs.
1374/// This is available on _both_ inner nodes and roots.
1375#[derive(Clone)]
1376pub struct HydroIrOpMetadata {
1377    pub backtrace: Backtrace,
1378    pub cpu_usage: Option<f64>,
1379    pub network_recv_cpu_usage: Option<f64>,
1380    pub id: Option<usize>,
1381}
1382
1383impl HydroIrOpMetadata {
1384    #[expect(
1385        clippy::new_without_default,
1386        reason = "explicit calls to new ensure correct backtrace bounds"
1387    )]
1388    pub fn new() -> HydroIrOpMetadata {
1389        Self::new_with_skip(1)
1390    }
1391
1392    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1393        HydroIrOpMetadata {
1394            backtrace: Backtrace::get_backtrace(2 + skip_count),
1395            cpu_usage: None,
1396            network_recv_cpu_usage: None,
1397            id: None,
1398        }
1399    }
1400}
1401
1402impl Debug for HydroIrOpMetadata {
1403    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1404        f.debug_struct("HydroIrOpMetadata").finish()
1405    }
1406}
1407
1408impl Hash for HydroIrOpMetadata {
1409    fn hash<H: Hasher>(&self, _: &mut H) {}
1410}
1411
1412/// An intermediate node in a Hydro graph, which consumes data
1413/// from upstream nodes and emits data to downstream nodes.
1414#[derive(Debug, Hash)]
1415pub enum HydroNode {
1416    Placeholder,
1417
1418    /// Manually "casts" between two different collection kinds.
1419    ///
1420    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1421    /// correctness checks. In particular, the user must ensure that every possible
1422    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1423    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1424    /// collection. This ensures that the simulator does not miss any possible outputs.
1425    Cast {
1426        inner: Box<HydroNode>,
1427        metadata: HydroIrMetadata,
1428    },
1429
1430    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1431    /// interpretation of the input stream.
1432    ///
1433    /// In production, this simply passes through the input, but in simulation, this operator
1434    /// explicitly selects a randomized interpretation.
1435    ObserveNonDet {
1436        inner: Box<HydroNode>,
1437        trusted: bool, // if true, we do not need to simulate non-determinism
1438        metadata: HydroIrMetadata,
1439    },
1440
1441    Source {
1442        source: HydroSource,
1443        metadata: HydroIrMetadata,
1444    },
1445
1446    SingletonSource {
1447        value: DebugExpr,
1448        metadata: HydroIrMetadata,
1449    },
1450
1451    CycleSource {
1452        ident: syn::Ident,
1453        metadata: HydroIrMetadata,
1454    },
1455
1456    Tee {
1457        inner: TeeNode,
1458        metadata: HydroIrMetadata,
1459    },
1460
1461    Persist {
1462        inner: Box<HydroNode>,
1463        metadata: HydroIrMetadata,
1464    },
1465
1466    BeginAtomic {
1467        inner: Box<HydroNode>,
1468        metadata: HydroIrMetadata,
1469    },
1470
1471    EndAtomic {
1472        inner: Box<HydroNode>,
1473        metadata: HydroIrMetadata,
1474    },
1475
1476    Batch {
1477        inner: Box<HydroNode>,
1478        metadata: HydroIrMetadata,
1479    },
1480
1481    YieldConcat {
1482        inner: Box<HydroNode>,
1483        metadata: HydroIrMetadata,
1484    },
1485
1486    Chain {
1487        first: Box<HydroNode>,
1488        second: Box<HydroNode>,
1489        metadata: HydroIrMetadata,
1490    },
1491
1492    ChainFirst {
1493        first: Box<HydroNode>,
1494        second: Box<HydroNode>,
1495        metadata: HydroIrMetadata,
1496    },
1497
1498    CrossProduct {
1499        left: Box<HydroNode>,
1500        right: Box<HydroNode>,
1501        metadata: HydroIrMetadata,
1502    },
1503
1504    CrossSingleton {
1505        left: Box<HydroNode>,
1506        right: Box<HydroNode>,
1507        metadata: HydroIrMetadata,
1508    },
1509
1510    Join {
1511        left: Box<HydroNode>,
1512        right: Box<HydroNode>,
1513        metadata: HydroIrMetadata,
1514    },
1515
1516    Difference {
1517        pos: Box<HydroNode>,
1518        neg: Box<HydroNode>,
1519        metadata: HydroIrMetadata,
1520    },
1521
1522    AntiJoin {
1523        pos: Box<HydroNode>,
1524        neg: Box<HydroNode>,
1525        metadata: HydroIrMetadata,
1526    },
1527
1528    ResolveFutures {
1529        input: Box<HydroNode>,
1530        metadata: HydroIrMetadata,
1531    },
1532    ResolveFuturesOrdered {
1533        input: Box<HydroNode>,
1534        metadata: HydroIrMetadata,
1535    },
1536
1537    Map {
1538        f: DebugExpr,
1539        input: Box<HydroNode>,
1540        metadata: HydroIrMetadata,
1541    },
1542    FlatMap {
1543        f: DebugExpr,
1544        input: Box<HydroNode>,
1545        metadata: HydroIrMetadata,
1546    },
1547    Filter {
1548        f: DebugExpr,
1549        input: Box<HydroNode>,
1550        metadata: HydroIrMetadata,
1551    },
1552    FilterMap {
1553        f: DebugExpr,
1554        input: Box<HydroNode>,
1555        metadata: HydroIrMetadata,
1556    },
1557
1558    DeferTick {
1559        input: Box<HydroNode>,
1560        metadata: HydroIrMetadata,
1561    },
1562    Enumerate {
1563        input: Box<HydroNode>,
1564        metadata: HydroIrMetadata,
1565    },
1566    Inspect {
1567        f: DebugExpr,
1568        input: Box<HydroNode>,
1569        metadata: HydroIrMetadata,
1570    },
1571
1572    Unique {
1573        input: Box<HydroNode>,
1574        metadata: HydroIrMetadata,
1575    },
1576
1577    Sort {
1578        input: Box<HydroNode>,
1579        metadata: HydroIrMetadata,
1580    },
1581    Fold {
1582        init: DebugExpr,
1583        acc: DebugExpr,
1584        input: Box<HydroNode>,
1585        metadata: HydroIrMetadata,
1586    },
1587
1588    Scan {
1589        init: DebugExpr,
1590        acc: DebugExpr,
1591        input: Box<HydroNode>,
1592        metadata: HydroIrMetadata,
1593    },
1594    FoldKeyed {
1595        init: DebugExpr,
1596        acc: DebugExpr,
1597        input: Box<HydroNode>,
1598        metadata: HydroIrMetadata,
1599    },
1600
1601    Reduce {
1602        f: DebugExpr,
1603        input: Box<HydroNode>,
1604        metadata: HydroIrMetadata,
1605    },
1606    ReduceKeyed {
1607        f: DebugExpr,
1608        input: Box<HydroNode>,
1609        metadata: HydroIrMetadata,
1610    },
1611    ReduceKeyedWatermark {
1612        f: DebugExpr,
1613        input: Box<HydroNode>,
1614        watermark: Box<HydroNode>,
1615        metadata: HydroIrMetadata,
1616    },
1617
1618    Network {
1619        serialize_fn: Option<DebugExpr>,
1620        instantiate_fn: DebugInstantiate,
1621        deserialize_fn: Option<DebugExpr>,
1622        input: Box<HydroNode>,
1623        metadata: HydroIrMetadata,
1624    },
1625
1626    ExternalInput {
1627        from_external_id: usize,
1628        from_key: usize,
1629        from_many: bool,
1630        codec_type: DebugType,
1631        port_hint: NetworkHint,
1632        instantiate_fn: DebugInstantiate,
1633        deserialize_fn: Option<DebugExpr>,
1634        metadata: HydroIrMetadata,
1635    },
1636
1637    Counter {
1638        tag: String,
1639        duration: DebugExpr,
1640        prefix: String,
1641        input: Box<HydroNode>,
1642        metadata: HydroIrMetadata,
1643    },
1644}
1645
1646pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1647pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1648
1649impl HydroNode {
1650    pub fn transform_bottom_up(
1651        &mut self,
1652        transform: &mut impl FnMut(&mut HydroNode),
1653        seen_tees: &mut SeenTees,
1654        check_well_formed: bool,
1655    ) {
1656        self.transform_children(
1657            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1658            seen_tees,
1659        );
1660
1661        transform(self);
1662
1663        let self_location = self.metadata().location_kind.root();
1664
1665        if check_well_formed {
1666            match &*self {
1667                HydroNode::Network { .. } => {}
1668                _ => {
1669                    self.input_metadata().iter().for_each(|i| {
1670                        if i.location_kind.root() != self_location {
1671                            panic!(
1672                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1673                                i,
1674                                i.location_kind.root(),
1675                                self,
1676                                self_location
1677                            )
1678                        }
1679                    });
1680                }
1681            }
1682        }
1683    }
1684
1685    #[inline(always)]
1686    pub fn transform_children(
1687        &mut self,
1688        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1689        seen_tees: &mut SeenTees,
1690    ) {
1691        match self {
1692            HydroNode::Placeholder => {
1693                panic!();
1694            }
1695
1696            HydroNode::Source { .. }
1697            | HydroNode::SingletonSource { .. }
1698            | HydroNode::CycleSource { .. }
1699            | HydroNode::ExternalInput { .. } => {}
1700
1701            HydroNode::Tee { inner, .. } => {
1702                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1703                    *inner = TeeNode(transformed.clone());
1704                } else {
1705                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1706                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1707                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1708                    transform(&mut orig, seen_tees);
1709                    *transformed_cell.borrow_mut() = orig;
1710                    *inner = TeeNode(transformed_cell);
1711                }
1712            }
1713
1714            HydroNode::Cast { inner, .. }
1715            | HydroNode::ObserveNonDet { inner, .. }
1716            | HydroNode::Persist { inner, .. }
1717            | HydroNode::BeginAtomic { inner, .. }
1718            | HydroNode::EndAtomic { inner, .. }
1719            | HydroNode::Batch { inner, .. }
1720            | HydroNode::YieldConcat { inner, .. } => {
1721                transform(inner.as_mut(), seen_tees);
1722            }
1723
1724            HydroNode::Chain { first, second, .. } => {
1725                transform(first.as_mut(), seen_tees);
1726                transform(second.as_mut(), seen_tees);
1727            }
1728
1729            HydroNode::ChainFirst { first, second, .. } => {
1730                transform(first.as_mut(), seen_tees);
1731                transform(second.as_mut(), seen_tees);
1732            }
1733
1734            HydroNode::CrossSingleton { left, right, .. }
1735            | HydroNode::CrossProduct { left, right, .. }
1736            | HydroNode::Join { left, right, .. } => {
1737                transform(left.as_mut(), seen_tees);
1738                transform(right.as_mut(), seen_tees);
1739            }
1740
1741            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1742                transform(pos.as_mut(), seen_tees);
1743                transform(neg.as_mut(), seen_tees);
1744            }
1745
1746            HydroNode::ReduceKeyedWatermark {
1747                input, watermark, ..
1748            } => {
1749                transform(input.as_mut(), seen_tees);
1750                transform(watermark.as_mut(), seen_tees);
1751            }
1752
1753            HydroNode::Map { input, .. }
1754            | HydroNode::ResolveFutures { input, .. }
1755            | HydroNode::ResolveFuturesOrdered { input, .. }
1756            | HydroNode::FlatMap { input, .. }
1757            | HydroNode::Filter { input, .. }
1758            | HydroNode::FilterMap { input, .. }
1759            | HydroNode::Sort { input, .. }
1760            | HydroNode::DeferTick { input, .. }
1761            | HydroNode::Enumerate { input, .. }
1762            | HydroNode::Inspect { input, .. }
1763            | HydroNode::Unique { input, .. }
1764            | HydroNode::Network { input, .. }
1765            | HydroNode::Fold { input, .. }
1766            | HydroNode::Scan { input, .. }
1767            | HydroNode::FoldKeyed { input, .. }
1768            | HydroNode::Reduce { input, .. }
1769            | HydroNode::ReduceKeyed { input, .. }
1770            | HydroNode::Counter { input, .. } => {
1771                transform(input.as_mut(), seen_tees);
1772            }
1773        }
1774    }
1775
1776    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1777        match self {
1778            HydroNode::Placeholder => HydroNode::Placeholder,
1779            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1780                inner: Box::new(inner.deep_clone(seen_tees)),
1781                metadata: metadata.clone(),
1782            },
1783            HydroNode::ObserveNonDet {
1784                inner,
1785                trusted,
1786                metadata,
1787            } => HydroNode::ObserveNonDet {
1788                inner: Box::new(inner.deep_clone(seen_tees)),
1789                trusted: *trusted,
1790                metadata: metadata.clone(),
1791            },
1792            HydroNode::Source { source, metadata } => HydroNode::Source {
1793                source: source.clone(),
1794                metadata: metadata.clone(),
1795            },
1796            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1797                value: value.clone(),
1798                metadata: metadata.clone(),
1799            },
1800            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1801                ident: ident.clone(),
1802                metadata: metadata.clone(),
1803            },
1804            HydroNode::Tee { inner, metadata } => {
1805                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1806                    HydroNode::Tee {
1807                        inner: TeeNode(transformed.clone()),
1808                        metadata: metadata.clone(),
1809                    }
1810                } else {
1811                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1812                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1813                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1814                    *new_rc.borrow_mut() = cloned;
1815                    HydroNode::Tee {
1816                        inner: TeeNode(new_rc),
1817                        metadata: metadata.clone(),
1818                    }
1819                }
1820            }
1821            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1822                inner: Box::new(inner.deep_clone(seen_tees)),
1823                metadata: metadata.clone(),
1824            },
1825            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1826                inner: Box::new(inner.deep_clone(seen_tees)),
1827                metadata: metadata.clone(),
1828            },
1829            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1830                inner: Box::new(inner.deep_clone(seen_tees)),
1831                metadata: metadata.clone(),
1832            },
1833            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1834                inner: Box::new(inner.deep_clone(seen_tees)),
1835                metadata: metadata.clone(),
1836            },
1837            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1838                inner: Box::new(inner.deep_clone(seen_tees)),
1839                metadata: metadata.clone(),
1840            },
1841            HydroNode::Chain {
1842                first,
1843                second,
1844                metadata,
1845            } => HydroNode::Chain {
1846                first: Box::new(first.deep_clone(seen_tees)),
1847                second: Box::new(second.deep_clone(seen_tees)),
1848                metadata: metadata.clone(),
1849            },
1850            HydroNode::ChainFirst {
1851                first,
1852                second,
1853                metadata,
1854            } => HydroNode::ChainFirst {
1855                first: Box::new(first.deep_clone(seen_tees)),
1856                second: Box::new(second.deep_clone(seen_tees)),
1857                metadata: metadata.clone(),
1858            },
1859            HydroNode::CrossProduct {
1860                left,
1861                right,
1862                metadata,
1863            } => HydroNode::CrossProduct {
1864                left: Box::new(left.deep_clone(seen_tees)),
1865                right: Box::new(right.deep_clone(seen_tees)),
1866                metadata: metadata.clone(),
1867            },
1868            HydroNode::CrossSingleton {
1869                left,
1870                right,
1871                metadata,
1872            } => HydroNode::CrossSingleton {
1873                left: Box::new(left.deep_clone(seen_tees)),
1874                right: Box::new(right.deep_clone(seen_tees)),
1875                metadata: metadata.clone(),
1876            },
1877            HydroNode::Join {
1878                left,
1879                right,
1880                metadata,
1881            } => HydroNode::Join {
1882                left: Box::new(left.deep_clone(seen_tees)),
1883                right: Box::new(right.deep_clone(seen_tees)),
1884                metadata: metadata.clone(),
1885            },
1886            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1887                pos: Box::new(pos.deep_clone(seen_tees)),
1888                neg: Box::new(neg.deep_clone(seen_tees)),
1889                metadata: metadata.clone(),
1890            },
1891            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1892                pos: Box::new(pos.deep_clone(seen_tees)),
1893                neg: Box::new(neg.deep_clone(seen_tees)),
1894                metadata: metadata.clone(),
1895            },
1896            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1897                input: Box::new(input.deep_clone(seen_tees)),
1898                metadata: metadata.clone(),
1899            },
1900            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1901                HydroNode::ResolveFuturesOrdered {
1902                    input: Box::new(input.deep_clone(seen_tees)),
1903                    metadata: metadata.clone(),
1904                }
1905            }
1906            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1907                f: f.clone(),
1908                input: Box::new(input.deep_clone(seen_tees)),
1909                metadata: metadata.clone(),
1910            },
1911            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1912                f: f.clone(),
1913                input: Box::new(input.deep_clone(seen_tees)),
1914                metadata: metadata.clone(),
1915            },
1916            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1917                f: f.clone(),
1918                input: Box::new(input.deep_clone(seen_tees)),
1919                metadata: metadata.clone(),
1920            },
1921            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1922                f: f.clone(),
1923                input: Box::new(input.deep_clone(seen_tees)),
1924                metadata: metadata.clone(),
1925            },
1926            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1927                input: Box::new(input.deep_clone(seen_tees)),
1928                metadata: metadata.clone(),
1929            },
1930            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1931                input: Box::new(input.deep_clone(seen_tees)),
1932                metadata: metadata.clone(),
1933            },
1934            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1935                f: f.clone(),
1936                input: Box::new(input.deep_clone(seen_tees)),
1937                metadata: metadata.clone(),
1938            },
1939            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1940                input: Box::new(input.deep_clone(seen_tees)),
1941                metadata: metadata.clone(),
1942            },
1943            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1944                input: Box::new(input.deep_clone(seen_tees)),
1945                metadata: metadata.clone(),
1946            },
1947            HydroNode::Fold {
1948                init,
1949                acc,
1950                input,
1951                metadata,
1952            } => HydroNode::Fold {
1953                init: init.clone(),
1954                acc: acc.clone(),
1955                input: Box::new(input.deep_clone(seen_tees)),
1956                metadata: metadata.clone(),
1957            },
1958            HydroNode::Scan {
1959                init,
1960                acc,
1961                input,
1962                metadata,
1963            } => HydroNode::Scan {
1964                init: init.clone(),
1965                acc: acc.clone(),
1966                input: Box::new(input.deep_clone(seen_tees)),
1967                metadata: metadata.clone(),
1968            },
1969            HydroNode::FoldKeyed {
1970                init,
1971                acc,
1972                input,
1973                metadata,
1974            } => HydroNode::FoldKeyed {
1975                init: init.clone(),
1976                acc: acc.clone(),
1977                input: Box::new(input.deep_clone(seen_tees)),
1978                metadata: metadata.clone(),
1979            },
1980            HydroNode::ReduceKeyedWatermark {
1981                f,
1982                input,
1983                watermark,
1984                metadata,
1985            } => HydroNode::ReduceKeyedWatermark {
1986                f: f.clone(),
1987                input: Box::new(input.deep_clone(seen_tees)),
1988                watermark: Box::new(watermark.deep_clone(seen_tees)),
1989                metadata: metadata.clone(),
1990            },
1991            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1992                f: f.clone(),
1993                input: Box::new(input.deep_clone(seen_tees)),
1994                metadata: metadata.clone(),
1995            },
1996            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1997                f: f.clone(),
1998                input: Box::new(input.deep_clone(seen_tees)),
1999                metadata: metadata.clone(),
2000            },
2001            HydroNode::Network {
2002                serialize_fn,
2003                instantiate_fn,
2004                deserialize_fn,
2005                input,
2006                metadata,
2007            } => HydroNode::Network {
2008                serialize_fn: serialize_fn.clone(),
2009                instantiate_fn: instantiate_fn.clone(),
2010                deserialize_fn: deserialize_fn.clone(),
2011                input: Box::new(input.deep_clone(seen_tees)),
2012                metadata: metadata.clone(),
2013            },
2014            HydroNode::ExternalInput {
2015                from_external_id,
2016                from_key,
2017                from_many,
2018                codec_type,
2019                port_hint,
2020                instantiate_fn,
2021                deserialize_fn,
2022                metadata,
2023            } => HydroNode::ExternalInput {
2024                from_external_id: *from_external_id,
2025                from_key: *from_key,
2026                from_many: *from_many,
2027                codec_type: codec_type.clone(),
2028                port_hint: *port_hint,
2029                instantiate_fn: instantiate_fn.clone(),
2030                deserialize_fn: deserialize_fn.clone(),
2031                metadata: metadata.clone(),
2032            },
2033            HydroNode::Counter {
2034                tag,
2035                duration,
2036                prefix,
2037                input,
2038                metadata,
2039            } => HydroNode::Counter {
2040                tag: tag.clone(),
2041                duration: duration.clone(),
2042                prefix: prefix.clone(),
2043                input: Box::new(input.deep_clone(seen_tees)),
2044                metadata: metadata.clone(),
2045            },
2046        }
2047    }
2048
2049    #[cfg(feature = "build")]
2050    pub fn emit_core(
2051        &mut self,
2052        builders_or_callback: &mut BuildersOrCallback<
2053            impl FnMut(&mut HydroRoot, &mut usize),
2054            impl FnMut(&mut HydroNode, &mut usize),
2055        >,
2056        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2057        next_stmt_id: &mut usize,
2058    ) -> syn::Ident {
2059        let out_location = self.metadata().location_kind.clone();
2060        match self {
2061            HydroNode::Placeholder => {
2062                panic!()
2063            }
2064
2065            HydroNode::Cast { inner, .. } => {
2066                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2067
2068                match builders_or_callback {
2069                    BuildersOrCallback::Builders(_) => {}
2070                    BuildersOrCallback::Callback(_, node_callback) => {
2071                        node_callback(self, next_stmt_id);
2072                    }
2073                }
2074
2075                *next_stmt_id += 1;
2076
2077                inner_ident
2078            }
2079
2080            HydroNode::ObserveNonDet {
2081                inner,
2082                trusted,
2083                metadata,
2084                ..
2085            } => {
2086                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2087
2088                let observe_ident =
2089                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2090
2091                match builders_or_callback {
2092                    BuildersOrCallback::Builders(graph_builders) => {
2093                        graph_builders.observe_nondet(
2094                            *trusted,
2095                            &inner.metadata().location_kind,
2096                            inner_ident,
2097                            &inner.metadata().collection_kind,
2098                            &observe_ident,
2099                            &metadata.collection_kind,
2100                        );
2101                    }
2102                    BuildersOrCallback::Callback(_, node_callback) => {
2103                        node_callback(self, next_stmt_id);
2104                    }
2105                }
2106
2107                *next_stmt_id += 1;
2108
2109                observe_ident
2110            }
2111
2112            HydroNode::Persist { inner, .. } => {
2113                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2114
2115                let persist_ident =
2116                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2117
2118                match builders_or_callback {
2119                    BuildersOrCallback::Builders(graph_builders) => {
2120                        let builder = graph_builders.get_dfir_mut(&out_location);
2121                        builder.add_dfir(
2122                            parse_quote! {
2123                                #persist_ident = #inner_ident -> persist::<'static>();
2124                            },
2125                            None,
2126                            Some(&next_stmt_id.to_string()),
2127                        );
2128                    }
2129                    BuildersOrCallback::Callback(_, node_callback) => {
2130                        node_callback(self, next_stmt_id);
2131                    }
2132                }
2133
2134                *next_stmt_id += 1;
2135
2136                persist_ident
2137            }
2138
2139            HydroNode::Batch {
2140                inner, metadata, ..
2141            } => {
2142                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2143
2144                let batch_ident =
2145                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2146
2147                match builders_or_callback {
2148                    BuildersOrCallback::Builders(graph_builders) => {
2149                        graph_builders.batch(
2150                            inner_ident,
2151                            &inner.metadata().location_kind,
2152                            &inner.metadata().collection_kind,
2153                            &batch_ident,
2154                            &out_location,
2155                            &metadata.op,
2156                        );
2157                    }
2158                    BuildersOrCallback::Callback(_, node_callback) => {
2159                        node_callback(self, next_stmt_id);
2160                    }
2161                }
2162
2163                *next_stmt_id += 1;
2164
2165                batch_ident
2166            }
2167
2168            HydroNode::YieldConcat { inner, .. } => {
2169                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2170
2171                let yield_ident =
2172                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2173
2174                match builders_or_callback {
2175                    BuildersOrCallback::Builders(graph_builders) => {
2176                        graph_builders.yield_from_tick(
2177                            inner_ident,
2178                            &inner.metadata().location_kind,
2179                            &inner.metadata().collection_kind,
2180                            &yield_ident,
2181                            &out_location,
2182                        );
2183                    }
2184                    BuildersOrCallback::Callback(_, node_callback) => {
2185                        node_callback(self, next_stmt_id);
2186                    }
2187                }
2188
2189                *next_stmt_id += 1;
2190
2191                yield_ident
2192            }
2193
2194            HydroNode::BeginAtomic { inner, metadata } => {
2195                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2196
2197                let begin_ident =
2198                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2199
2200                match builders_or_callback {
2201                    BuildersOrCallback::Builders(graph_builders) => {
2202                        graph_builders.begin_atomic(
2203                            inner_ident,
2204                            &inner.metadata().location_kind,
2205                            &inner.metadata().collection_kind,
2206                            &begin_ident,
2207                            &out_location,
2208                            &metadata.op,
2209                        );
2210                    }
2211                    BuildersOrCallback::Callback(_, node_callback) => {
2212                        node_callback(self, next_stmt_id);
2213                    }
2214                }
2215
2216                *next_stmt_id += 1;
2217
2218                begin_ident
2219            }
2220
2221            HydroNode::EndAtomic { inner, .. } => {
2222                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2223
2224                let end_ident =
2225                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2226
2227                match builders_or_callback {
2228                    BuildersOrCallback::Builders(graph_builders) => {
2229                        graph_builders.end_atomic(
2230                            inner_ident,
2231                            &inner.metadata().location_kind,
2232                            &inner.metadata().collection_kind,
2233                            &end_ident,
2234                        );
2235                    }
2236                    BuildersOrCallback::Callback(_, node_callback) => {
2237                        node_callback(self, next_stmt_id);
2238                    }
2239                }
2240
2241                *next_stmt_id += 1;
2242
2243                end_ident
2244            }
2245
2246            HydroNode::Source {
2247                source, metadata, ..
2248            } => {
2249                if let HydroSource::ExternalNetwork() = source {
2250                    syn::Ident::new("DUMMY", Span::call_site())
2251                } else {
2252                    let source_ident =
2253                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255                    let source_stmt = match source {
2256                        HydroSource::Stream(expr) => {
2257                            debug_assert!(metadata.location_kind.is_top_level());
2258                            parse_quote! {
2259                                #source_ident = source_stream(#expr);
2260                            }
2261                        }
2262
2263                        HydroSource::ExternalNetwork() => {
2264                            unreachable!()
2265                        }
2266
2267                        HydroSource::Iter(expr) => {
2268                            if metadata.location_kind.is_top_level() {
2269                                parse_quote! {
2270                                    #source_ident = source_iter(#expr);
2271                                }
2272                            } else {
2273                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2274                                parse_quote! {
2275                                    #source_ident = source_iter(#expr) -> persist::<'static>();
2276                                }
2277                            }
2278                        }
2279
2280                        HydroSource::Spin() => {
2281                            debug_assert!(metadata.location_kind.is_top_level());
2282                            parse_quote! {
2283                                #source_ident = spin();
2284                            }
2285                        }
2286                    };
2287
2288                    match builders_or_callback {
2289                        BuildersOrCallback::Builders(graph_builders) => {
2290                            let builder = graph_builders.get_dfir_mut(&out_location);
2291                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2292                        }
2293                        BuildersOrCallback::Callback(_, node_callback) => {
2294                            node_callback(self, next_stmt_id);
2295                        }
2296                    }
2297
2298                    *next_stmt_id += 1;
2299
2300                    source_ident
2301                }
2302            }
2303
2304            HydroNode::SingletonSource { value, metadata } => {
2305                let source_ident =
2306                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2307
2308                match builders_or_callback {
2309                    BuildersOrCallback::Builders(graph_builders) => {
2310                        let should_replay = !graph_builders.singleton_intermediates();
2311                        let builder = graph_builders.get_dfir_mut(&out_location);
2312
2313                        if should_replay || !metadata.location_kind.is_top_level() {
2314                            builder.add_dfir(
2315                                parse_quote! {
2316                                    #source_ident = source_iter([#value]) -> persist::<'static>();
2317                                },
2318                                None,
2319                                Some(&next_stmt_id.to_string()),
2320                            );
2321                        } else {
2322                            builder.add_dfir(
2323                                parse_quote! {
2324                                    #source_ident = source_iter([#value]);
2325                                },
2326                                None,
2327                                Some(&next_stmt_id.to_string()),
2328                            );
2329                        }
2330                    }
2331                    BuildersOrCallback::Callback(_, node_callback) => {
2332                        node_callback(self, next_stmt_id);
2333                    }
2334                }
2335
2336                *next_stmt_id += 1;
2337
2338                source_ident
2339            }
2340
2341            HydroNode::CycleSource { ident, .. } => {
2342                let ident = ident.clone();
2343
2344                match builders_or_callback {
2345                    BuildersOrCallback::Builders(_) => {}
2346                    BuildersOrCallback::Callback(_, node_callback) => {
2347                        node_callback(self, next_stmt_id);
2348                    }
2349                }
2350
2351                // consume a stmt id even though we did not emit anything so that we can instrument this
2352                *next_stmt_id += 1;
2353
2354                ident
2355            }
2356
2357            HydroNode::Tee { inner, .. } => {
2358                let ret_ident = if let Some(teed_from) =
2359                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2360                {
2361                    match builders_or_callback {
2362                        BuildersOrCallback::Builders(_) => {}
2363                        BuildersOrCallback::Callback(_, node_callback) => {
2364                            node_callback(self, next_stmt_id);
2365                        }
2366                    }
2367
2368                    teed_from.clone()
2369                } else {
2370                    let inner_ident = inner.0.borrow_mut().emit_core(
2371                        builders_or_callback,
2372                        built_tees,
2373                        next_stmt_id,
2374                    );
2375
2376                    let tee_ident =
2377                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2378
2379                    built_tees.insert(
2380                        inner.0.as_ref() as *const RefCell<HydroNode>,
2381                        tee_ident.clone(),
2382                    );
2383
2384                    match builders_or_callback {
2385                        BuildersOrCallback::Builders(graph_builders) => {
2386                            let builder = graph_builders.get_dfir_mut(&out_location);
2387                            builder.add_dfir(
2388                                parse_quote! {
2389                                    #tee_ident = #inner_ident -> tee();
2390                                },
2391                                None,
2392                                Some(&next_stmt_id.to_string()),
2393                            );
2394                        }
2395                        BuildersOrCallback::Callback(_, node_callback) => {
2396                            node_callback(self, next_stmt_id);
2397                        }
2398                    }
2399
2400                    tee_ident
2401                };
2402
2403                // we consume a stmt id regardless of if we emit the tee() operator,
2404                // so that during rewrites we touch all recipients of the tee()
2405
2406                *next_stmt_id += 1;
2407                ret_ident
2408            }
2409
2410            HydroNode::Chain { first, second, .. } => {
2411                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2412                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2413
2414                let chain_ident =
2415                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2416
2417                match builders_or_callback {
2418                    BuildersOrCallback::Builders(graph_builders) => {
2419                        let builder = graph_builders.get_dfir_mut(&out_location);
2420                        builder.add_dfir(
2421                            parse_quote! {
2422                                #chain_ident = chain();
2423                                #first_ident -> [0]#chain_ident;
2424                                #second_ident -> [1]#chain_ident;
2425                            },
2426                            None,
2427                            Some(&next_stmt_id.to_string()),
2428                        );
2429                    }
2430                    BuildersOrCallback::Callback(_, node_callback) => {
2431                        node_callback(self, next_stmt_id);
2432                    }
2433                }
2434
2435                *next_stmt_id += 1;
2436
2437                chain_ident
2438            }
2439
2440            HydroNode::ChainFirst { first, second, .. } => {
2441                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2442                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2443
2444                let chain_ident =
2445                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2446
2447                match builders_or_callback {
2448                    BuildersOrCallback::Builders(graph_builders) => {
2449                        let builder = graph_builders.get_dfir_mut(&out_location);
2450                        builder.add_dfir(
2451                            parse_quote! {
2452                                #chain_ident = chain_first_n(1);
2453                                #first_ident -> [0]#chain_ident;
2454                                #second_ident -> [1]#chain_ident;
2455                            },
2456                            None,
2457                            Some(&next_stmt_id.to_string()),
2458                        );
2459                    }
2460                    BuildersOrCallback::Callback(_, node_callback) => {
2461                        node_callback(self, next_stmt_id);
2462                    }
2463                }
2464
2465                *next_stmt_id += 1;
2466
2467                chain_ident
2468            }
2469
2470            HydroNode::CrossSingleton { left, right, .. } => {
2471                let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2472                let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2473
2474                let cross_ident =
2475                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2476
2477                match builders_or_callback {
2478                    BuildersOrCallback::Builders(graph_builders) => {
2479                        let builder = graph_builders.get_dfir_mut(&out_location);
2480                        builder.add_dfir(
2481                            parse_quote! {
2482                                #cross_ident = cross_singleton();
2483                                #left_ident -> [input]#cross_ident;
2484                                #right_ident -> [single]#cross_ident;
2485                            },
2486                            None,
2487                            Some(&next_stmt_id.to_string()),
2488                        );
2489                    }
2490                    BuildersOrCallback::Callback(_, node_callback) => {
2491                        node_callback(self, next_stmt_id);
2492                    }
2493                }
2494
2495                *next_stmt_id += 1;
2496
2497                cross_ident
2498            }
2499
2500            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2501                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2502                    parse_quote!(cross_join_multiset)
2503                } else {
2504                    parse_quote!(join_multiset)
2505                };
2506
2507                let (HydroNode::CrossProduct { left, right, .. }
2508                | HydroNode::Join { left, right, .. }) = self
2509                else {
2510                    unreachable!()
2511                };
2512
2513                let is_top_level = left.metadata().location_kind.is_top_level()
2514                    && right.metadata().location_kind.is_top_level();
2515                let (left_inner, left_lifetime) =
2516                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2517                        debug_assert!(!left.metadata().location_kind.is_top_level());
2518                        (left, quote!('static))
2519                    } else if left.metadata().location_kind.is_top_level() {
2520                        (left, quote!('static))
2521                    } else {
2522                        (left, quote!('tick))
2523                    };
2524
2525                let (right_inner, right_lifetime) =
2526                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2527                        debug_assert!(!right.metadata().location_kind.is_top_level());
2528                        (right, quote!('static))
2529                    } else if right.metadata().location_kind.is_top_level() {
2530                        (right, quote!('static))
2531                    } else {
2532                        (right, quote!('tick))
2533                    };
2534
2535                let left_ident =
2536                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2537                let right_ident =
2538                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2539
2540                let stream_ident =
2541                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2542
2543                match builders_or_callback {
2544                    BuildersOrCallback::Builders(graph_builders) => {
2545                        let builder = graph_builders.get_dfir_mut(&out_location);
2546                        builder.add_dfir(
2547                            if is_top_level {
2548                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2549                                // a multiset_delta() to negate the replay behavior
2550                                parse_quote! {
2551                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2552                                    #left_ident -> [0]#stream_ident;
2553                                    #right_ident -> [1]#stream_ident;
2554                                }
2555                            } else {
2556                                parse_quote! {
2557                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2558                                    #left_ident -> [0]#stream_ident;
2559                                    #right_ident -> [1]#stream_ident;
2560                                }
2561                            }
2562                            ,
2563                            None,
2564                            Some(&next_stmt_id.to_string()),
2565                        );
2566                    }
2567                    BuildersOrCallback::Callback(_, node_callback) => {
2568                        node_callback(self, next_stmt_id);
2569                    }
2570                }
2571
2572                *next_stmt_id += 1;
2573
2574                stream_ident
2575            }
2576
2577            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2578                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2579                    parse_quote!(difference_multiset)
2580                } else {
2581                    parse_quote!(anti_join_multiset)
2582                };
2583
2584                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2585                    self
2586                else {
2587                    unreachable!()
2588                };
2589
2590                let (neg, neg_lifetime) =
2591                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2592                        debug_assert!(!neg.metadata().location_kind.is_top_level());
2593                        (neg, quote!('static))
2594                    } else if neg.metadata().location_kind.is_top_level() {
2595                        (neg, quote!('static))
2596                    } else {
2597                        (neg, quote!('tick))
2598                    };
2599
2600                let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2601                let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2602
2603                let stream_ident =
2604                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2605
2606                match builders_or_callback {
2607                    BuildersOrCallback::Builders(graph_builders) => {
2608                        let builder = graph_builders.get_dfir_mut(&out_location);
2609                        builder.add_dfir(
2610                            parse_quote! {
2611                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2612                                #pos_ident -> [pos]#stream_ident;
2613                                #neg_ident -> [neg]#stream_ident;
2614                            },
2615                            None,
2616                            Some(&next_stmt_id.to_string()),
2617                        );
2618                    }
2619                    BuildersOrCallback::Callback(_, node_callback) => {
2620                        node_callback(self, next_stmt_id);
2621                    }
2622                }
2623
2624                *next_stmt_id += 1;
2625
2626                stream_ident
2627            }
2628
2629            HydroNode::ResolveFutures { input, .. } => {
2630                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2631
2632                let futures_ident =
2633                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2634
2635                match builders_or_callback {
2636                    BuildersOrCallback::Builders(graph_builders) => {
2637                        let builder = graph_builders.get_dfir_mut(&out_location);
2638                        builder.add_dfir(
2639                            parse_quote! {
2640                                #futures_ident = #input_ident -> resolve_futures();
2641                            },
2642                            None,
2643                            Some(&next_stmt_id.to_string()),
2644                        );
2645                    }
2646                    BuildersOrCallback::Callback(_, node_callback) => {
2647                        node_callback(self, next_stmt_id);
2648                    }
2649                }
2650
2651                *next_stmt_id += 1;
2652
2653                futures_ident
2654            }
2655
2656            HydroNode::ResolveFuturesOrdered { input, .. } => {
2657                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2658
2659                let futures_ident =
2660                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2661
2662                match builders_or_callback {
2663                    BuildersOrCallback::Builders(graph_builders) => {
2664                        let builder = graph_builders.get_dfir_mut(&out_location);
2665                        builder.add_dfir(
2666                            parse_quote! {
2667                                #futures_ident = #input_ident -> resolve_futures_ordered();
2668                            },
2669                            None,
2670                            Some(&next_stmt_id.to_string()),
2671                        );
2672                    }
2673                    BuildersOrCallback::Callback(_, node_callback) => {
2674                        node_callback(self, next_stmt_id);
2675                    }
2676                }
2677
2678                *next_stmt_id += 1;
2679
2680                futures_ident
2681            }
2682
2683            HydroNode::Map { f, input, .. } => {
2684                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2685
2686                let map_ident =
2687                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2688
2689                match builders_or_callback {
2690                    BuildersOrCallback::Builders(graph_builders) => {
2691                        let builder = graph_builders.get_dfir_mut(&out_location);
2692                        builder.add_dfir(
2693                            parse_quote! {
2694                                #map_ident = #input_ident -> map(#f);
2695                            },
2696                            None,
2697                            Some(&next_stmt_id.to_string()),
2698                        );
2699                    }
2700                    BuildersOrCallback::Callback(_, node_callback) => {
2701                        node_callback(self, next_stmt_id);
2702                    }
2703                }
2704
2705                *next_stmt_id += 1;
2706
2707                map_ident
2708            }
2709
2710            HydroNode::FlatMap { f, input, .. } => {
2711                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2712
2713                let flat_map_ident =
2714                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2715
2716                match builders_or_callback {
2717                    BuildersOrCallback::Builders(graph_builders) => {
2718                        let builder = graph_builders.get_dfir_mut(&out_location);
2719                        builder.add_dfir(
2720                            parse_quote! {
2721                                #flat_map_ident = #input_ident -> flat_map(#f);
2722                            },
2723                            None,
2724                            Some(&next_stmt_id.to_string()),
2725                        );
2726                    }
2727                    BuildersOrCallback::Callback(_, node_callback) => {
2728                        node_callback(self, next_stmt_id);
2729                    }
2730                }
2731
2732                *next_stmt_id += 1;
2733
2734                flat_map_ident
2735            }
2736
2737            HydroNode::Filter { f, input, .. } => {
2738                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2739
2740                let filter_ident =
2741                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2742
2743                match builders_or_callback {
2744                    BuildersOrCallback::Builders(graph_builders) => {
2745                        let builder = graph_builders.get_dfir_mut(&out_location);
2746                        builder.add_dfir(
2747                            parse_quote! {
2748                                #filter_ident = #input_ident -> filter(#f);
2749                            },
2750                            None,
2751                            Some(&next_stmt_id.to_string()),
2752                        );
2753                    }
2754                    BuildersOrCallback::Callback(_, node_callback) => {
2755                        node_callback(self, next_stmt_id);
2756                    }
2757                }
2758
2759                *next_stmt_id += 1;
2760
2761                filter_ident
2762            }
2763
2764            HydroNode::FilterMap { f, input, .. } => {
2765                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2766
2767                let filter_map_ident =
2768                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2769
2770                match builders_or_callback {
2771                    BuildersOrCallback::Builders(graph_builders) => {
2772                        let builder = graph_builders.get_dfir_mut(&out_location);
2773                        builder.add_dfir(
2774                            parse_quote! {
2775                                #filter_map_ident = #input_ident -> filter_map(#f);
2776                            },
2777                            None,
2778                            Some(&next_stmt_id.to_string()),
2779                        );
2780                    }
2781                    BuildersOrCallback::Callback(_, node_callback) => {
2782                        node_callback(self, next_stmt_id);
2783                    }
2784                }
2785
2786                *next_stmt_id += 1;
2787
2788                filter_map_ident
2789            }
2790
2791            HydroNode::Sort { input, .. } => {
2792                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2793
2794                let sort_ident =
2795                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2796
2797                match builders_or_callback {
2798                    BuildersOrCallback::Builders(graph_builders) => {
2799                        let builder = graph_builders.get_dfir_mut(&out_location);
2800                        builder.add_dfir(
2801                            parse_quote! {
2802                                #sort_ident = #input_ident -> sort();
2803                            },
2804                            None,
2805                            Some(&next_stmt_id.to_string()),
2806                        );
2807                    }
2808                    BuildersOrCallback::Callback(_, node_callback) => {
2809                        node_callback(self, next_stmt_id);
2810                    }
2811                }
2812
2813                *next_stmt_id += 1;
2814
2815                sort_ident
2816            }
2817
2818            HydroNode::DeferTick { input, .. } => {
2819                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2820
2821                let defer_tick_ident =
2822                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2823
2824                match builders_or_callback {
2825                    BuildersOrCallback::Builders(graph_builders) => {
2826                        let builder = graph_builders.get_dfir_mut(&out_location);
2827                        builder.add_dfir(
2828                            parse_quote! {
2829                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2830                            },
2831                            None,
2832                            Some(&next_stmt_id.to_string()),
2833                        );
2834                    }
2835                    BuildersOrCallback::Callback(_, node_callback) => {
2836                        node_callback(self, next_stmt_id);
2837                    }
2838                }
2839
2840                *next_stmt_id += 1;
2841
2842                defer_tick_ident
2843            }
2844
2845            HydroNode::Enumerate { input, .. } => {
2846                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2847
2848                let enumerate_ident =
2849                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2850
2851                match builders_or_callback {
2852                    BuildersOrCallback::Builders(graph_builders) => {
2853                        let builder = graph_builders.get_dfir_mut(&out_location);
2854                        let lifetime = if input.metadata().location_kind.is_top_level() {
2855                            quote!('static)
2856                        } else {
2857                            quote!('tick)
2858                        };
2859                        builder.add_dfir(
2860                            parse_quote! {
2861                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2862                            },
2863                            None,
2864                            Some(&next_stmt_id.to_string()),
2865                        );
2866                    }
2867                    BuildersOrCallback::Callback(_, node_callback) => {
2868                        node_callback(self, next_stmt_id);
2869                    }
2870                }
2871
2872                *next_stmt_id += 1;
2873
2874                enumerate_ident
2875            }
2876
2877            HydroNode::Inspect { f, input, .. } => {
2878                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2879
2880                let inspect_ident =
2881                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2882
2883                match builders_or_callback {
2884                    BuildersOrCallback::Builders(graph_builders) => {
2885                        let builder = graph_builders.get_dfir_mut(&out_location);
2886                        builder.add_dfir(
2887                            parse_quote! {
2888                                #inspect_ident = #input_ident -> inspect(#f);
2889                            },
2890                            None,
2891                            Some(&next_stmt_id.to_string()),
2892                        );
2893                    }
2894                    BuildersOrCallback::Callback(_, node_callback) => {
2895                        node_callback(self, next_stmt_id);
2896                    }
2897                }
2898
2899                *next_stmt_id += 1;
2900
2901                inspect_ident
2902            }
2903
2904            HydroNode::Unique { input, .. } => {
2905                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2906
2907                let unique_ident =
2908                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910                match builders_or_callback {
2911                    BuildersOrCallback::Builders(graph_builders) => {
2912                        let builder = graph_builders.get_dfir_mut(&out_location);
2913                        let lifetime = if input.metadata().location_kind.is_top_level() {
2914                            quote!('static)
2915                        } else {
2916                            quote!('tick)
2917                        };
2918
2919                        builder.add_dfir(
2920                            parse_quote! {
2921                                #unique_ident = #input_ident -> unique::<#lifetime>();
2922                            },
2923                            None,
2924                            Some(&next_stmt_id.to_string()),
2925                        );
2926                    }
2927                    BuildersOrCallback::Callback(_, node_callback) => {
2928                        node_callback(self, next_stmt_id);
2929                    }
2930                }
2931
2932                *next_stmt_id += 1;
2933
2934                unique_ident
2935            }
2936
2937            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2938                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2939                    parse_quote!(fold)
2940                } else if matches!(self, HydroNode::Scan { .. }) {
2941                    parse_quote!(scan)
2942                } else {
2943                    parse_quote!(fold_keyed)
2944                };
2945
2946                let (HydroNode::Fold { input, .. }
2947                | HydroNode::FoldKeyed { input, .. }
2948                | HydroNode::Scan { input, .. }) = self
2949                else {
2950                    unreachable!()
2951                };
2952
2953                let (input, lifetime) =
2954                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2955                        debug_assert!(!input.metadata().location_kind.is_top_level());
2956                        (input, quote!('static))
2957                    } else if input.metadata().location_kind.is_top_level() {
2958                        (input, quote!('static))
2959                    } else {
2960                        (input, quote!('tick))
2961                    };
2962
2963                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2964
2965                let (HydroNode::Fold { init, acc, .. }
2966                | HydroNode::FoldKeyed { init, acc, .. }
2967                | HydroNode::Scan { init, acc, .. }) = &*self
2968                else {
2969                    unreachable!()
2970                };
2971
2972                let fold_ident =
2973                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2974
2975                match builders_or_callback {
2976                    BuildersOrCallback::Builders(graph_builders) => {
2977                        if matches!(self, HydroNode::Fold { .. })
2978                            && self.metadata().location_kind.is_top_level()
2979                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2980                            && graph_builders.singleton_intermediates()
2981                        {
2982                            let builder = graph_builders.get_dfir_mut(&out_location);
2983
2984                            let acc: syn::Expr = parse_quote!({
2985                                let mut __inner = #acc;
2986                                move |__state, __value| {
2987                                    __inner(__state, __value);
2988                                    Some(__state.clone())
2989                                }
2990                            });
2991
2992                            builder.add_dfir(
2993                                parse_quote! {
2994                                    source_iter([(#init)()]) -> [0]#fold_ident;
2995                                    #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2996                                    #fold_ident = chain();
2997                                },
2998                                None,
2999                                Some(&next_stmt_id.to_string()),
3000                            );
3001                        } else if matches!(self, HydroNode::FoldKeyed { .. })
3002                            && self.metadata().location_kind.is_top_level()
3003                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3004                            && graph_builders.singleton_intermediates()
3005                        {
3006                            let builder = graph_builders.get_dfir_mut(&out_location);
3007
3008                            let acc: syn::Expr = parse_quote!({
3009                                let mut __init = #init;
3010                                let mut __inner = #acc;
3011                                move |__state, (__key, __value)| {
3012                                    // TODO(shadaj): we can avoid the clone when the entry exists
3013                                    let __state = __state.entry(__key.clone()).or_insert_with(|| (__init)());
3014                                    __inner(__state, __value);
3015                                    Some((__key, __state.clone()))
3016                                }
3017                            });
3018
3019                            builder.add_dfir(
3020                                parse_quote! {
3021                                    source_iter([(#init)()]) -> [0]#fold_ident;
3022                                    #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3023                                },
3024                                None,
3025                                Some(&next_stmt_id.to_string()),
3026                            );
3027                        } else {
3028                            let builder = graph_builders.get_dfir_mut(&out_location);
3029                            builder.add_dfir(
3030                                parse_quote! {
3031                                    #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3032                                },
3033                                None,
3034                                Some(&next_stmt_id.to_string()),
3035                            );
3036                        }
3037                    }
3038                    BuildersOrCallback::Callback(_, node_callback) => {
3039                        node_callback(self, next_stmt_id);
3040                    }
3041                }
3042
3043                *next_stmt_id += 1;
3044
3045                fold_ident
3046            }
3047
3048            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3049                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3050                    parse_quote!(reduce)
3051                } else {
3052                    parse_quote!(reduce_keyed)
3053                };
3054
3055                let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3056                else {
3057                    unreachable!()
3058                };
3059
3060                let (input, lifetime) =
3061                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3062                        debug_assert!(!input.metadata().location_kind.is_top_level());
3063                        (input, quote!('static))
3064                    } else if input.metadata().location_kind.is_top_level() {
3065                        (input, quote!('static))
3066                    } else {
3067                        (input, quote!('tick))
3068                    };
3069
3070                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3071
3072                let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3073                else {
3074                    unreachable!()
3075                };
3076
3077                let reduce_ident =
3078                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3079
3080                match builders_or_callback {
3081                    BuildersOrCallback::Builders(graph_builders) => {
3082                        if matches!(self, HydroNode::Reduce { .. })
3083                            && self.metadata().location_kind.is_top_level()
3084                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3085                            && graph_builders.singleton_intermediates()
3086                        {
3087                            todo!(
3088                                "Reduce with optional intermediates is not yet supported in simulator"
3089                            );
3090                        } else if matches!(self, HydroNode::ReduceKeyed { .. })
3091                            && self.metadata().location_kind.is_top_level()
3092                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3093                            && graph_builders.singleton_intermediates()
3094                        {
3095                            todo!();
3096                        } else {
3097                            let builder = graph_builders.get_dfir_mut(&out_location);
3098                            builder.add_dfir(
3099                                parse_quote! {
3100                                    #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3101                                },
3102                                None,
3103                                Some(&next_stmt_id.to_string()),
3104                            );
3105                        }
3106                    }
3107                    BuildersOrCallback::Callback(_, node_callback) => {
3108                        node_callback(self, next_stmt_id);
3109                    }
3110                }
3111
3112                *next_stmt_id += 1;
3113
3114                reduce_ident
3115            }
3116
3117            HydroNode::ReduceKeyedWatermark {
3118                f,
3119                input,
3120                watermark,
3121                ..
3122            } => {
3123                let (input, lifetime) =
3124                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3125                        debug_assert!(!input.metadata().location_kind.is_top_level());
3126                        (input, quote!('static))
3127                    } else if input.metadata().location_kind.is_top_level() {
3128                        (input, quote!('static))
3129                    } else {
3130                        (input, quote!('tick))
3131                    };
3132
3133                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3134
3135                let watermark_ident =
3136                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
3137
3138                let chain_ident = syn::Ident::new(
3139                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3140                    Span::call_site(),
3141                );
3142
3143                let fold_ident =
3144                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146                match builders_or_callback {
3147                    BuildersOrCallback::Builders(graph_builders) => {
3148                        let builder = graph_builders.get_dfir_mut(&out_location);
3149                        // 1. Don't allow any values to be added to the map if the key <=the watermark
3150                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
3151                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
3152                        // 3. Convert the BTreeMap back into a stream of (k, v)
3153                        builder.add_dfir(
3154                            parse_quote! {
3155                                #chain_ident = chain();
3156                                #input_ident
3157                                    -> map(|x| (Some(x), None))
3158                                    -> [0]#chain_ident;
3159                                #watermark_ident
3160                                    -> map(|watermark| (None, Some(watermark)))
3161                                    -> [1]#chain_ident;
3162
3163                                #fold_ident = #chain_ident
3164                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3165                                        let __reduce_keyed_fn = #f;
3166                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3167                                            if let Some((k, v)) = opt_payload {
3168                                                if let Some(curr_watermark) = *opt_curr_watermark {
3169                                                    if k <= curr_watermark {
3170                                                        return;
3171                                                    }
3172                                                }
3173                                                match map.entry(k) {
3174                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
3175                                                        e.insert(v);
3176                                                    }
3177                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
3178                                                        __reduce_keyed_fn(e.get_mut(), v);
3179                                                    }
3180                                                }
3181                                            } else {
3182                                                let watermark = opt_watermark.unwrap();
3183                                                if let Some(curr_watermark) = *opt_curr_watermark {
3184                                                    if watermark <= curr_watermark {
3185                                                        return;
3186                                                    }
3187                                                }
3188                                                *opt_curr_watermark = opt_watermark;
3189                                                map.retain(|k, _| *k > watermark);
3190                                            }
3191                                        }
3192                                    })
3193                                    -> flat_map(|(map, _curr_watermark)| map);
3194                            },
3195                            None,
3196                            Some(&next_stmt_id.to_string()),
3197                        );
3198                    }
3199                    BuildersOrCallback::Callback(_, node_callback) => {
3200                        node_callback(self, next_stmt_id);
3201                    }
3202                }
3203
3204                *next_stmt_id += 1;
3205
3206                fold_ident
3207            }
3208
3209            HydroNode::Network {
3210                serialize_fn: serialize_pipeline,
3211                instantiate_fn,
3212                deserialize_fn: deserialize_pipeline,
3213                input,
3214                ..
3215            } => {
3216                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3217
3218                let receiver_stream_ident =
3219                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3220
3221                match builders_or_callback {
3222                    BuildersOrCallback::Builders(graph_builders) => {
3223                        let (sink_expr, source_expr) = match instantiate_fn {
3224                            DebugInstantiate::Building => (
3225                                syn::parse_quote!(DUMMY_SINK),
3226                                syn::parse_quote!(DUMMY_SOURCE),
3227                            ),
3228
3229                            DebugInstantiate::Finalized(finalized) => {
3230                                (finalized.sink.clone(), finalized.source.clone())
3231                            }
3232                        };
3233
3234                        graph_builders.create_network(
3235                            &input.metadata().location_kind,
3236                            &out_location,
3237                            input_ident,
3238                            &receiver_stream_ident,
3239                            serialize_pipeline,
3240                            sink_expr,
3241                            source_expr,
3242                            deserialize_pipeline,
3243                            *next_stmt_id,
3244                        );
3245                    }
3246                    BuildersOrCallback::Callback(_, node_callback) => {
3247                        node_callback(self, next_stmt_id);
3248                    }
3249                }
3250
3251                *next_stmt_id += 1;
3252
3253                receiver_stream_ident
3254            }
3255
3256            HydroNode::ExternalInput {
3257                instantiate_fn,
3258                deserialize_fn: deserialize_pipeline,
3259                ..
3260            } => {
3261                let receiver_stream_ident =
3262                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3263
3264                match builders_or_callback {
3265                    BuildersOrCallback::Builders(graph_builders) => {
3266                        let (_, source_expr) = match instantiate_fn {
3267                            DebugInstantiate::Building => (
3268                                syn::parse_quote!(DUMMY_SINK),
3269                                syn::parse_quote!(DUMMY_SOURCE),
3270                            ),
3271
3272                            DebugInstantiate::Finalized(finalized) => {
3273                                (finalized.sink.clone(), finalized.source.clone())
3274                            }
3275                        };
3276
3277                        graph_builders.create_external_source(
3278                            &out_location,
3279                            source_expr,
3280                            &receiver_stream_ident,
3281                            deserialize_pipeline,
3282                            *next_stmt_id,
3283                        );
3284                    }
3285                    BuildersOrCallback::Callback(_, node_callback) => {
3286                        node_callback(self, next_stmt_id);
3287                    }
3288                }
3289
3290                *next_stmt_id += 1;
3291
3292                receiver_stream_ident
3293            }
3294
3295            HydroNode::Counter {
3296                tag,
3297                duration,
3298                prefix,
3299                input,
3300                ..
3301            } => {
3302                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3303
3304                let counter_ident =
3305                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3306
3307                match builders_or_callback {
3308                    BuildersOrCallback::Builders(graph_builders) => {
3309                        let builder = graph_builders.get_dfir_mut(&out_location);
3310                        builder.add_dfir(
3311                            parse_quote! {
3312                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3313                            },
3314                            None,
3315                            Some(&next_stmt_id.to_string()),
3316                        );
3317                    }
3318                    BuildersOrCallback::Callback(_, node_callback) => {
3319                        node_callback(self, next_stmt_id);
3320                    }
3321                }
3322
3323                *next_stmt_id += 1;
3324
3325                counter_ident
3326            }
3327        }
3328    }
3329
3330    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3331        match self {
3332            HydroNode::Placeholder => {
3333                panic!()
3334            }
3335            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3336            HydroNode::Source { source, .. } => match source {
3337                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3338                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3339            },
3340            HydroNode::SingletonSource { value, .. } => {
3341                transform(value);
3342            }
3343            HydroNode::CycleSource { .. }
3344            | HydroNode::Tee { .. }
3345            | HydroNode::Persist { .. }
3346            | HydroNode::YieldConcat { .. }
3347            | HydroNode::BeginAtomic { .. }
3348            | HydroNode::EndAtomic { .. }
3349            | HydroNode::Batch { .. }
3350            | HydroNode::Chain { .. }
3351            | HydroNode::ChainFirst { .. }
3352            | HydroNode::CrossProduct { .. }
3353            | HydroNode::CrossSingleton { .. }
3354            | HydroNode::ResolveFutures { .. }
3355            | HydroNode::ResolveFuturesOrdered { .. }
3356            | HydroNode::Join { .. }
3357            | HydroNode::Difference { .. }
3358            | HydroNode::AntiJoin { .. }
3359            | HydroNode::DeferTick { .. }
3360            | HydroNode::Enumerate { .. }
3361            | HydroNode::Unique { .. }
3362            | HydroNode::Sort { .. } => {}
3363            HydroNode::Map { f, .. }
3364            | HydroNode::FlatMap { f, .. }
3365            | HydroNode::Filter { f, .. }
3366            | HydroNode::FilterMap { f, .. }
3367            | HydroNode::Inspect { f, .. }
3368            | HydroNode::Reduce { f, .. }
3369            | HydroNode::ReduceKeyed { f, .. }
3370            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3371                transform(f);
3372            }
3373            HydroNode::Fold { init, acc, .. }
3374            | HydroNode::Scan { init, acc, .. }
3375            | HydroNode::FoldKeyed { init, acc, .. } => {
3376                transform(init);
3377                transform(acc);
3378            }
3379            HydroNode::Network {
3380                serialize_fn,
3381                deserialize_fn,
3382                ..
3383            } => {
3384                if let Some(serialize_fn) = serialize_fn {
3385                    transform(serialize_fn);
3386                }
3387                if let Some(deserialize_fn) = deserialize_fn {
3388                    transform(deserialize_fn);
3389                }
3390            }
3391            HydroNode::ExternalInput { deserialize_fn, .. } => {
3392                if let Some(deserialize_fn) = deserialize_fn {
3393                    transform(deserialize_fn);
3394                }
3395            }
3396            HydroNode::Counter { duration, .. } => {
3397                transform(duration);
3398            }
3399        }
3400    }
3401
3402    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3403        &self.metadata().op
3404    }
3405
3406    pub fn metadata(&self) -> &HydroIrMetadata {
3407        match self {
3408            HydroNode::Placeholder => {
3409                panic!()
3410            }
3411            HydroNode::Cast { metadata, .. } => metadata,
3412            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3413            HydroNode::Source { metadata, .. } => metadata,
3414            HydroNode::SingletonSource { metadata, .. } => metadata,
3415            HydroNode::CycleSource { metadata, .. } => metadata,
3416            HydroNode::Tee { metadata, .. } => metadata,
3417            HydroNode::Persist { metadata, .. } => metadata,
3418            HydroNode::YieldConcat { metadata, .. } => metadata,
3419            HydroNode::BeginAtomic { metadata, .. } => metadata,
3420            HydroNode::EndAtomic { metadata, .. } => metadata,
3421            HydroNode::Batch { metadata, .. } => metadata,
3422            HydroNode::Chain { metadata, .. } => metadata,
3423            HydroNode::ChainFirst { metadata, .. } => metadata,
3424            HydroNode::CrossProduct { metadata, .. } => metadata,
3425            HydroNode::CrossSingleton { metadata, .. } => metadata,
3426            HydroNode::Join { metadata, .. } => metadata,
3427            HydroNode::Difference { metadata, .. } => metadata,
3428            HydroNode::AntiJoin { metadata, .. } => metadata,
3429            HydroNode::ResolveFutures { metadata, .. } => metadata,
3430            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3431            HydroNode::Map { metadata, .. } => metadata,
3432            HydroNode::FlatMap { metadata, .. } => metadata,
3433            HydroNode::Filter { metadata, .. } => metadata,
3434            HydroNode::FilterMap { metadata, .. } => metadata,
3435            HydroNode::DeferTick { metadata, .. } => metadata,
3436            HydroNode::Enumerate { metadata, .. } => metadata,
3437            HydroNode::Inspect { metadata, .. } => metadata,
3438            HydroNode::Unique { metadata, .. } => metadata,
3439            HydroNode::Sort { metadata, .. } => metadata,
3440            HydroNode::Scan { metadata, .. } => metadata,
3441            HydroNode::Fold { metadata, .. } => metadata,
3442            HydroNode::FoldKeyed { metadata, .. } => metadata,
3443            HydroNode::Reduce { metadata, .. } => metadata,
3444            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3445            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3446            HydroNode::ExternalInput { metadata, .. } => metadata,
3447            HydroNode::Network { metadata, .. } => metadata,
3448            HydroNode::Counter { metadata, .. } => metadata,
3449        }
3450    }
3451
3452    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3453        &mut self.metadata_mut().op
3454    }
3455
3456    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3457        match self {
3458            HydroNode::Placeholder => {
3459                panic!()
3460            }
3461            HydroNode::Cast { metadata, .. } => metadata,
3462            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3463            HydroNode::Source { metadata, .. } => metadata,
3464            HydroNode::SingletonSource { metadata, .. } => metadata,
3465            HydroNode::CycleSource { metadata, .. } => metadata,
3466            HydroNode::Tee { metadata, .. } => metadata,
3467            HydroNode::Persist { metadata, .. } => metadata,
3468            HydroNode::YieldConcat { metadata, .. } => metadata,
3469            HydroNode::BeginAtomic { metadata, .. } => metadata,
3470            HydroNode::EndAtomic { metadata, .. } => metadata,
3471            HydroNode::Batch { metadata, .. } => metadata,
3472            HydroNode::Chain { metadata, .. } => metadata,
3473            HydroNode::ChainFirst { metadata, .. } => metadata,
3474            HydroNode::CrossProduct { metadata, .. } => metadata,
3475            HydroNode::CrossSingleton { metadata, .. } => metadata,
3476            HydroNode::Join { metadata, .. } => metadata,
3477            HydroNode::Difference { metadata, .. } => metadata,
3478            HydroNode::AntiJoin { metadata, .. } => metadata,
3479            HydroNode::ResolveFutures { metadata, .. } => metadata,
3480            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3481            HydroNode::Map { metadata, .. } => metadata,
3482            HydroNode::FlatMap { metadata, .. } => metadata,
3483            HydroNode::Filter { metadata, .. } => metadata,
3484            HydroNode::FilterMap { metadata, .. } => metadata,
3485            HydroNode::DeferTick { metadata, .. } => metadata,
3486            HydroNode::Enumerate { metadata, .. } => metadata,
3487            HydroNode::Inspect { metadata, .. } => metadata,
3488            HydroNode::Unique { metadata, .. } => metadata,
3489            HydroNode::Sort { metadata, .. } => metadata,
3490            HydroNode::Scan { metadata, .. } => metadata,
3491            HydroNode::Fold { metadata, .. } => metadata,
3492            HydroNode::FoldKeyed { metadata, .. } => metadata,
3493            HydroNode::Reduce { metadata, .. } => metadata,
3494            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3495            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3496            HydroNode::ExternalInput { metadata, .. } => metadata,
3497            HydroNode::Network { metadata, .. } => metadata,
3498            HydroNode::Counter { metadata, .. } => metadata,
3499        }
3500    }
3501
3502    pub fn input(&self) -> Vec<&HydroNode> {
3503        match self {
3504            HydroNode::Placeholder => {
3505                panic!()
3506            }
3507            HydroNode::Source { .. }
3508            | HydroNode::SingletonSource { .. }
3509            | HydroNode::ExternalInput { .. }
3510            | HydroNode::CycleSource { .. }
3511            | HydroNode::Tee { .. } => {
3512                // Tee should find its input in separate special ways
3513                vec![]
3514            }
3515            HydroNode::Cast { inner, .. }
3516            | HydroNode::ObserveNonDet { inner, .. }
3517            | HydroNode::Persist { inner, .. }
3518            | HydroNode::YieldConcat { inner, .. }
3519            | HydroNode::BeginAtomic { inner, .. }
3520            | HydroNode::EndAtomic { inner, .. }
3521            | HydroNode::Batch { inner, .. } => {
3522                vec![inner]
3523            }
3524            HydroNode::Chain { first, second, .. } => {
3525                vec![first, second]
3526            }
3527            HydroNode::ChainFirst { first, second, .. } => {
3528                vec![first, second]
3529            }
3530            HydroNode::CrossProduct { left, right, .. }
3531            | HydroNode::CrossSingleton { left, right, .. }
3532            | HydroNode::Join { left, right, .. } => {
3533                vec![left, right]
3534            }
3535            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3536                vec![pos, neg]
3537            }
3538            HydroNode::Map { input, .. }
3539            | HydroNode::FlatMap { input, .. }
3540            | HydroNode::Filter { input, .. }
3541            | HydroNode::FilterMap { input, .. }
3542            | HydroNode::Sort { input, .. }
3543            | HydroNode::DeferTick { input, .. }
3544            | HydroNode::Enumerate { input, .. }
3545            | HydroNode::Inspect { input, .. }
3546            | HydroNode::Unique { input, .. }
3547            | HydroNode::Network { input, .. }
3548            | HydroNode::Counter { input, .. }
3549            | HydroNode::ResolveFutures { input, .. }
3550            | HydroNode::ResolveFuturesOrdered { input, .. } => {
3551                vec![input]
3552            }
3553            HydroNode::Fold { input, .. }
3554            | HydroNode::FoldKeyed { input, .. }
3555            | HydroNode::Reduce { input, .. }
3556            | HydroNode::ReduceKeyed { input, .. }
3557            | HydroNode::Scan { input, .. } => {
3558                // Skip persist before fold/reduce
3559                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3560                    vec![inner]
3561                } else {
3562                    vec![input]
3563                }
3564            }
3565            HydroNode::ReduceKeyedWatermark {
3566                input, watermark, ..
3567            } => {
3568                // Skip persist before fold/reduce
3569                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3570                    vec![inner, watermark]
3571                } else {
3572                    vec![input, watermark]
3573                }
3574            }
3575        }
3576    }
3577
3578    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3579        self.input()
3580            .iter()
3581            .map(|input_node| input_node.metadata())
3582            .collect()
3583    }
3584
3585    pub fn print_root(&self) -> String {
3586        match self {
3587            HydroNode::Placeholder => {
3588                panic!()
3589            }
3590            HydroNode::Cast { .. } => "Cast()".to_string(),
3591            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3592            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3593            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3594            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3595            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3596            HydroNode::Persist { .. } => "Persist()".to_string(),
3597            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3598            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3599            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3600            HydroNode::Batch { .. } => "Batch()".to_string(),
3601            HydroNode::Chain { first, second, .. } => {
3602                format!("Chain({}, {})", first.print_root(), second.print_root())
3603            }
3604            HydroNode::ChainFirst { first, second, .. } => {
3605                format!(
3606                    "ChainFirst({}, {})",
3607                    first.print_root(),
3608                    second.print_root()
3609                )
3610            }
3611            HydroNode::CrossProduct { left, right, .. } => {
3612                format!(
3613                    "CrossProduct({}, {})",
3614                    left.print_root(),
3615                    right.print_root()
3616                )
3617            }
3618            HydroNode::CrossSingleton { left, right, .. } => {
3619                format!(
3620                    "CrossSingleton({}, {})",
3621                    left.print_root(),
3622                    right.print_root()
3623                )
3624            }
3625            HydroNode::Join { left, right, .. } => {
3626                format!("Join({}, {})", left.print_root(), right.print_root())
3627            }
3628            HydroNode::Difference { pos, neg, .. } => {
3629                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3630            }
3631            HydroNode::AntiJoin { pos, neg, .. } => {
3632                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3633            }
3634            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3635            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3636            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3637            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3638            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3639            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3640            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3641            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3642            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3643            HydroNode::Unique { .. } => "Unique()".to_string(),
3644            HydroNode::Sort { .. } => "Sort()".to_string(),
3645            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3646            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3647            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3648            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3649            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3650            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3651            HydroNode::Network { .. } => "Network()".to_string(),
3652            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3653            HydroNode::Counter { tag, duration, .. } => {
3654                format!("Counter({:?}, {:?})", tag, duration)
3655            }
3656        }
3657    }
3658}
3659
3660#[cfg(feature = "build")]
3661fn instantiate_network<'a, D>(
3662    from_location: &LocationId,
3663    to_location: &LocationId,
3664    processes: &HashMap<usize, D::Process>,
3665    clusters: &HashMap<usize, D::Cluster>,
3666    compile_env: &D::CompileEnv,
3667) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3668where
3669    D: Deploy<'a>,
3670{
3671    let ((sink, source), connect_fn) = match (from_location, to_location) {
3672        (LocationId::Process(from), LocationId::Process(to)) => {
3673            let from_node = processes
3674                .get(from)
3675                .unwrap_or_else(|| {
3676                    panic!("A process used in the graph was not instantiated: {}", from)
3677                })
3678                .clone();
3679            let to_node = processes
3680                .get(to)
3681                .unwrap_or_else(|| {
3682                    panic!("A process used in the graph was not instantiated: {}", to)
3683                })
3684                .clone();
3685
3686            let sink_port = D::allocate_process_port(&from_node);
3687            let source_port = D::allocate_process_port(&to_node);
3688
3689            (
3690                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3691                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3692            )
3693        }
3694        (LocationId::Process(from), LocationId::Cluster(to)) => {
3695            let from_node = processes
3696                .get(from)
3697                .unwrap_or_else(|| {
3698                    panic!("A process used in the graph was not instantiated: {}", from)
3699                })
3700                .clone();
3701            let to_node = clusters
3702                .get(to)
3703                .unwrap_or_else(|| {
3704                    panic!("A cluster used in the graph was not instantiated: {}", to)
3705                })
3706                .clone();
3707
3708            let sink_port = D::allocate_process_port(&from_node);
3709            let source_port = D::allocate_cluster_port(&to_node);
3710
3711            (
3712                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3713                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3714            )
3715        }
3716        (LocationId::Cluster(from), LocationId::Process(to)) => {
3717            let from_node = clusters
3718                .get(from)
3719                .unwrap_or_else(|| {
3720                    panic!("A cluster used in the graph was not instantiated: {}", from)
3721                })
3722                .clone();
3723            let to_node = processes
3724                .get(to)
3725                .unwrap_or_else(|| {
3726                    panic!("A process used in the graph was not instantiated: {}", to)
3727                })
3728                .clone();
3729
3730            let sink_port = D::allocate_cluster_port(&from_node);
3731            let source_port = D::allocate_process_port(&to_node);
3732
3733            (
3734                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3735                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3736            )
3737        }
3738        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3739            let from_node = clusters
3740                .get(from)
3741                .unwrap_or_else(|| {
3742                    panic!("A cluster used in the graph was not instantiated: {}", from)
3743                })
3744                .clone();
3745            let to_node = clusters
3746                .get(to)
3747                .unwrap_or_else(|| {
3748                    panic!("A cluster used in the graph was not instantiated: {}", to)
3749                })
3750                .clone();
3751
3752            let sink_port = D::allocate_cluster_port(&from_node);
3753            let source_port = D::allocate_cluster_port(&to_node);
3754
3755            (
3756                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3757                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3758            )
3759        }
3760        (LocationId::Tick(_, _), _) => panic!(),
3761        (_, LocationId::Tick(_, _)) => panic!(),
3762        (LocationId::Atomic(_), _) => panic!(),
3763        (_, LocationId::Atomic(_)) => panic!(),
3764    };
3765    (sink, source, connect_fn)
3766}
3767
3768#[cfg(test)]
3769mod test {
3770    use std::mem::size_of;
3771
3772    use stageleft::{QuotedWithContext, q};
3773
3774    use super::*;
3775
3776    #[test]
3777    fn hydro_node_size() {
3778        assert_eq!(size_of::<HydroNode>(), 264);
3779    }
3780
3781    #[test]
3782    fn hydro_root_size() {
3783        assert_eq!(size_of::<HydroRoot>(), 160);
3784    }
3785
3786    #[test]
3787    fn test_simplify_q_macro_basic() {
3788        // Test basic non-q! expression
3789        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3790        let result = simplify_q_macro(simple_expr.clone());
3791        assert_eq!(result, simple_expr);
3792    }
3793
3794    #[test]
3795    fn test_simplify_q_macro_actual_stageleft_call() {
3796        // Test a simplified version of what a real stageleft call might look like
3797        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3798        let result = simplify_q_macro(stageleft_call);
3799        // This should be processed by our visitor and simplified to q!(...)
3800        // since we detect the stageleft::runtime_support::fn_* pattern
3801        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3802    }
3803
3804    #[test]
3805    fn test_closure_no_pipe_at_start() {
3806        // Test a closure that does not start with a pipe
3807        let stageleft_call = q!({
3808            let foo = 123;
3809            move |b: usize| b + foo
3810        })
3811        .splice_fn1_ctx(&());
3812        let result = simplify_q_macro(stageleft_call);
3813        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3814    }
3815}