hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307    ClusterMembers(LocationId),
308}
309
310#[cfg(feature = "build")]
311/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
312/// and simulations.
313///
314/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
315/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
316pub trait DfirBuilder {
317    /// Whether the representation of singletons should include intermediate states.
318    fn singleton_intermediates(&self) -> bool;
319
320    /// Gets the DFIR builder for the given location, creating it if necessary.
321    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
322
323    fn batch(
324        &mut self,
325        in_ident: syn::Ident,
326        in_location: &LocationId,
327        in_kind: &CollectionKind,
328        out_ident: &syn::Ident,
329        out_location: &LocationId,
330        op_meta: &HydroIrOpMetadata,
331    );
332    fn yield_from_tick(
333        &mut self,
334        in_ident: syn::Ident,
335        in_location: &LocationId,
336        in_kind: &CollectionKind,
337        out_ident: &syn::Ident,
338        out_location: &LocationId,
339    );
340
341    fn begin_atomic(
342        &mut self,
343        in_ident: syn::Ident,
344        in_location: &LocationId,
345        in_kind: &CollectionKind,
346        out_ident: &syn::Ident,
347        out_location: &LocationId,
348        op_meta: &HydroIrOpMetadata,
349    );
350    fn end_atomic(
351        &mut self,
352        in_ident: syn::Ident,
353        in_location: &LocationId,
354        in_kind: &CollectionKind,
355        out_ident: &syn::Ident,
356    );
357
358    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
359    fn observe_nondet(
360        &mut self,
361        trusted: bool,
362        location: &LocationId,
363        in_ident: syn::Ident,
364        in_kind: &CollectionKind,
365        out_ident: &syn::Ident,
366        out_kind: &CollectionKind,
367        op_meta: &HydroIrOpMetadata,
368    );
369
370    #[expect(clippy::too_many_arguments, reason = "TODO")]
371    fn create_network(
372        &mut self,
373        from: &LocationId,
374        to: &LocationId,
375        input_ident: syn::Ident,
376        out_ident: &syn::Ident,
377        serialize: &Option<DebugExpr>,
378        sink: syn::Expr,
379        source: syn::Expr,
380        deserialize: &Option<DebugExpr>,
381        tag_id: usize,
382    );
383
384    fn create_external_source(
385        &mut self,
386        on: &LocationId,
387        source_expr: syn::Expr,
388        out_ident: &syn::Ident,
389        deserialize: &Option<DebugExpr>,
390        tag_id: usize,
391    );
392
393    fn create_external_output(
394        &mut self,
395        on: &LocationId,
396        sink_expr: syn::Expr,
397        input_ident: &syn::Ident,
398        serialize: &Option<DebugExpr>,
399        tag_id: usize,
400    );
401}
402
403#[cfg(feature = "build")]
404impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
405    fn singleton_intermediates(&self) -> bool {
406        false
407    }
408
409    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
410        self.entry(location.root().raw_id()).or_default()
411    }
412
413    fn batch(
414        &mut self,
415        in_ident: syn::Ident,
416        in_location: &LocationId,
417        _in_kind: &CollectionKind,
418        out_ident: &syn::Ident,
419        _out_location: &LocationId,
420        _op_meta: &HydroIrOpMetadata,
421    ) {
422        let builder = self.get_dfir_mut(in_location.root());
423        builder.add_dfir(
424            parse_quote! {
425                #out_ident = #in_ident;
426            },
427            None,
428            None,
429        );
430    }
431
432    fn yield_from_tick(
433        &mut self,
434        in_ident: syn::Ident,
435        in_location: &LocationId,
436        _in_kind: &CollectionKind,
437        out_ident: &syn::Ident,
438        _out_location: &LocationId,
439    ) {
440        let builder = self.get_dfir_mut(in_location.root());
441        builder.add_dfir(
442            parse_quote! {
443                #out_ident = #in_ident;
444            },
445            None,
446            None,
447        );
448    }
449
450    fn begin_atomic(
451        &mut self,
452        in_ident: syn::Ident,
453        in_location: &LocationId,
454        _in_kind: &CollectionKind,
455        out_ident: &syn::Ident,
456        _out_location: &LocationId,
457        _op_meta: &HydroIrOpMetadata,
458    ) {
459        let builder = self.get_dfir_mut(in_location.root());
460        builder.add_dfir(
461            parse_quote! {
462                #out_ident = #in_ident;
463            },
464            None,
465            None,
466        );
467    }
468
469    fn end_atomic(
470        &mut self,
471        in_ident: syn::Ident,
472        in_location: &LocationId,
473        _in_kind: &CollectionKind,
474        out_ident: &syn::Ident,
475    ) {
476        let builder = self.get_dfir_mut(in_location.root());
477        builder.add_dfir(
478            parse_quote! {
479                #out_ident = #in_ident;
480            },
481            None,
482            None,
483        );
484    }
485
486    fn observe_nondet(
487        &mut self,
488        _trusted: bool,
489        location: &LocationId,
490        in_ident: syn::Ident,
491        _in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        _out_kind: &CollectionKind,
494        _op_meta: &HydroIrOpMetadata,
495    ) {
496        let builder = self.get_dfir_mut(location);
497        builder.add_dfir(
498            parse_quote! {
499                #out_ident = #in_ident;
500            },
501            None,
502            None,
503        );
504    }
505
506    fn create_network(
507        &mut self,
508        from: &LocationId,
509        to: &LocationId,
510        input_ident: syn::Ident,
511        out_ident: &syn::Ident,
512        serialize: &Option<DebugExpr>,
513        sink: syn::Expr,
514        source: syn::Expr,
515        deserialize: &Option<DebugExpr>,
516        tag_id: usize,
517    ) {
518        let sender_builder = self.get_dfir_mut(from);
519        if let Some(serialize_pipeline) = serialize {
520            sender_builder.add_dfir(
521                parse_quote! {
522                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
523                },
524                None,
525                // operator tag separates send and receive, which otherwise have the same next_stmt_id
526                Some(&format!("send{}", tag_id)),
527            );
528        } else {
529            sender_builder.add_dfir(
530                parse_quote! {
531                    #input_ident -> dest_sink(#sink);
532                },
533                None,
534                Some(&format!("send{}", tag_id)),
535            );
536        }
537
538        let receiver_builder = self.get_dfir_mut(to);
539        if let Some(deserialize_pipeline) = deserialize {
540            receiver_builder.add_dfir(
541                parse_quote! {
542                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
543                },
544                None,
545                Some(&format!("recv{}", tag_id)),
546            );
547        } else {
548            receiver_builder.add_dfir(
549                parse_quote! {
550                    #out_ident = source_stream(#source);
551                },
552                None,
553                Some(&format!("recv{}", tag_id)),
554            );
555        }
556    }
557
558    fn create_external_source(
559        &mut self,
560        on: &LocationId,
561        source_expr: syn::Expr,
562        out_ident: &syn::Ident,
563        deserialize: &Option<DebugExpr>,
564        tag_id: usize,
565    ) {
566        let receiver_builder = self.get_dfir_mut(on);
567        if let Some(deserialize_pipeline) = deserialize {
568            receiver_builder.add_dfir(
569                parse_quote! {
570                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
571                },
572                None,
573                Some(&format!("recv{}", tag_id)),
574            );
575        } else {
576            receiver_builder.add_dfir(
577                parse_quote! {
578                    #out_ident = source_stream(#source_expr);
579                },
580                None,
581                Some(&format!("recv{}", tag_id)),
582            );
583        }
584    }
585
586    fn create_external_output(
587        &mut self,
588        on: &LocationId,
589        sink_expr: syn::Expr,
590        input_ident: &syn::Ident,
591        serialize: &Option<DebugExpr>,
592        tag_id: usize,
593    ) {
594        let sender_builder = self.get_dfir_mut(on);
595        if let Some(serialize_fn) = serialize {
596            sender_builder.add_dfir(
597                parse_quote! {
598                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
599                },
600                None,
601                // operator tag separates send and receive, which otherwise have the same next_stmt_id
602                Some(&format!("send{}", tag_id)),
603            );
604        } else {
605            sender_builder.add_dfir(
606                parse_quote! {
607                    #input_ident -> dest_sink(#sink_expr);
608                },
609                None,
610                Some(&format!("send{}", tag_id)),
611            );
612        }
613    }
614}
615
616#[cfg(feature = "build")]
617pub enum BuildersOrCallback<'a, L, N>
618where
619    L: FnMut(&mut HydroRoot, &mut usize),
620    N: FnMut(&mut HydroNode, &mut usize),
621{
622    Builders(&'a mut dyn DfirBuilder),
623    Callback(L, N),
624}
625
626/// An root in a Hydro graph, which is an pipeline that doesn't emit
627/// any downstream values. Traversals over the dataflow graph and
628/// generating DFIR IR start from roots.
629#[derive(Debug, Hash)]
630pub enum HydroRoot {
631    ForEach {
632        f: DebugExpr,
633        input: Box<HydroNode>,
634        op_metadata: HydroIrOpMetadata,
635    },
636    SendExternal {
637        to_external_id: usize,
638        to_key: usize,
639        to_many: bool,
640        unpaired: bool,
641        serialize_fn: Option<DebugExpr>,
642        instantiate_fn: DebugInstantiate,
643        input: Box<HydroNode>,
644        op_metadata: HydroIrOpMetadata,
645    },
646    DestSink {
647        sink: DebugExpr,
648        input: Box<HydroNode>,
649        op_metadata: HydroIrOpMetadata,
650    },
651    CycleSink {
652        ident: syn::Ident,
653        input: Box<HydroNode>,
654        op_metadata: HydroIrOpMetadata,
655    },
656}
657
658impl HydroRoot {
659    #[cfg(feature = "build")]
660    pub fn compile_network<'a, D>(
661        &mut self,
662        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
663        seen_tees: &mut SeenTees,
664        processes: &HashMap<usize, D::Process>,
665        clusters: &HashMap<usize, D::Cluster>,
666        externals: &HashMap<usize, D::External>,
667    ) where
668        D: Deploy<'a>,
669    {
670        let refcell_extra_stmts = RefCell::new(extra_stmts);
671        self.transform_bottom_up(
672            &mut |l| {
673                if let HydroRoot::SendExternal {
674                    input,
675                    to_external_id,
676                    to_key,
677                    to_many,
678                    unpaired,
679                    instantiate_fn,
680                    ..
681                } = l
682                {
683                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
684                        DebugInstantiate::Building => {
685                            let to_node = externals
686                                .get(to_external_id)
687                                .unwrap_or_else(|| {
688                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
689                                })
690                                .clone();
691
692                            match input.metadata().location_kind.root() {
693                                LocationId::Process(process_id) => {
694                                    if *to_many {
695                                        (
696                                            (
697                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
698                                                parse_quote!(DUMMY),
699                                            ),
700                                            Box::new(|| {}) as Box<dyn FnOnce()>,
701                                        )
702                                    } else {
703                                        let from_node = processes
704                                            .get(process_id)
705                                            .unwrap_or_else(|| {
706                                                panic!("A process used in the graph was not instantiated: {}", process_id)
707                                            })
708                                            .clone();
709
710                                        let sink_port = D::allocate_process_port(&from_node);
711                                        let source_port: <D as Deploy<'a>>::Port = D::allocate_external_port(&to_node);
712
713                                        if *unpaired {
714                                            use stageleft::quote_type;
715                                            use tokio_util::codec::LengthDelimitedCodec;
716
717                                            to_node.register(*to_key, source_port.clone());
718
719                                            let _ = D::e2o_source(
720                                                refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
721                                                &to_node, &source_port,
722                                                &from_node, &sink_port,
723                                                &quote_type::<LengthDelimitedCodec>(),
724                                                format!("{}_{}", *to_external_id, *to_key)
725                                            );
726                                        }
727
728                                        (
729                                            (
730                                                D::o2e_sink(
731                                                    &from_node,
732                                                    &sink_port,
733                                                    &to_node,
734                                                    &source_port,
735                                                    format!("{}_{}", *to_external_id, *to_key)
736                                                ),
737                                                parse_quote!(DUMMY),
738                                            ),
739                                            if *unpaired {
740                                                D::e2o_connect(
741                                                    &to_node,
742                                                    &source_port,
743                                                    &from_node,
744                                                    &sink_port,
745                                                    *to_many,
746                                                    NetworkHint::Auto,
747                                                )
748                                            } else {
749                                                Box::new(|| {}) as Box<dyn FnOnce()>
750                                            },
751                                        )
752                                    }
753                                }
754                                LocationId::Cluster(_) => todo!(),
755                                _ => panic!()
756                            }
757                        },
758
759                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
760                    };
761
762                    *instantiate_fn = DebugInstantiateFinalized {
763                        sink: sink_expr,
764                        source: source_expr,
765                        connect_fn: Some(connect_fn),
766                    }
767                    .into();
768                }
769            },
770            &mut |n| {
771                if let HydroNode::Network {
772                    input,
773                    instantiate_fn,
774                    metadata,
775                    ..
776                } = n
777                {
778                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
779                        DebugInstantiate::Building => instantiate_network::<D>(
780                            input.metadata().location_kind.root(),
781                            metadata.location_kind.root(),
782                            processes,
783                            clusters,
784                        ),
785
786                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787                    };
788
789                    *instantiate_fn = DebugInstantiateFinalized {
790                        sink: sink_expr,
791                        source: source_expr,
792                        connect_fn: Some(connect_fn),
793                    }
794                    .into();
795                } else if let HydroNode::ExternalInput {
796                    from_external_id,
797                    from_key,
798                    from_many,
799                    codec_type,
800                    port_hint,
801                    instantiate_fn,
802                    metadata,
803                    ..
804                } = n
805                {
806                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807                        DebugInstantiate::Building => {
808                            let from_node = externals
809                                .get(from_external_id)
810                                .unwrap_or_else(|| {
811                                    panic!(
812                                        "A external used in the graph was not instantiated: {}",
813                                        from_external_id
814                                    )
815                                })
816                                .clone();
817
818                            match metadata.location_kind.root() {
819                                LocationId::Process(process_id) => {
820                                    let to_node = processes
821                                        .get(process_id)
822                                        .unwrap_or_else(|| {
823                                            panic!("A process used in the graph was not instantiated: {}", process_id)
824                                        })
825                                        .clone();
826
827                                    let sink_port = D::allocate_external_port(&from_node);
828                                    let source_port = D::allocate_process_port(&to_node);
829
830                                    from_node.register(*from_key, sink_port.clone());
831
832                                    (
833                                        (
834                                            parse_quote!(DUMMY),
835                                            if *from_many {
836                                                D::e2o_many_source(
837                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
838                                                    &to_node, &source_port,
839                                                    codec_type.0.as_ref(),
840                                                    format!("{}_{}", *from_external_id, *from_key)
841                                                )
842                                            } else {
843                                                D::e2o_source(
844                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
845                                                    &from_node, &sink_port,
846                                                    &to_node, &source_port,
847                                                    codec_type.0.as_ref(),
848                                                    format!("{}_{}", *from_external_id, *from_key)
849                                                )
850                                            },
851                                        ),
852                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
853                                    )
854                                }
855                                LocationId::Cluster(_) => todo!(),
856                                _ => panic!()
857                            }
858                        },
859
860                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
861                    };
862
863                    *instantiate_fn = DebugInstantiateFinalized {
864                        sink: sink_expr,
865                        source: source_expr,
866                        connect_fn: Some(connect_fn),
867                    }
868                    .into();
869                }
870            },
871            seen_tees,
872            false,
873        );
874    }
875
876    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
877        self.transform_bottom_up(
878            &mut |l| {
879                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
880                    match instantiate_fn {
881                        DebugInstantiate::Building => panic!("network not built"),
882
883                        DebugInstantiate::Finalized(finalized) => {
884                            (finalized.connect_fn.take().unwrap())();
885                        }
886                    }
887                }
888            },
889            &mut |n| {
890                if let HydroNode::Network { instantiate_fn, .. }
891                | HydroNode::ExternalInput { instantiate_fn, .. } = n
892                {
893                    match instantiate_fn {
894                        DebugInstantiate::Building => panic!("network not built"),
895
896                        DebugInstantiate::Finalized(finalized) => {
897                            (finalized.connect_fn.take().unwrap())();
898                        }
899                    }
900                }
901            },
902            seen_tees,
903            false,
904        );
905    }
906
907    pub fn transform_bottom_up(
908        &mut self,
909        transform_root: &mut impl FnMut(&mut HydroRoot),
910        transform_node: &mut impl FnMut(&mut HydroNode),
911        seen_tees: &mut SeenTees,
912        check_well_formed: bool,
913    ) {
914        self.transform_children(
915            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
916            seen_tees,
917        );
918
919        transform_root(self);
920    }
921
922    pub fn transform_children(
923        &mut self,
924        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
925        seen_tees: &mut SeenTees,
926    ) {
927        match self {
928            HydroRoot::ForEach { input, .. }
929            | HydroRoot::SendExternal { input, .. }
930            | HydroRoot::DestSink { input, .. }
931            | HydroRoot::CycleSink { input, .. } => {
932                transform(input, seen_tees);
933            }
934        }
935    }
936
937    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
938        match self {
939            HydroRoot::ForEach {
940                f,
941                input,
942                op_metadata,
943            } => HydroRoot::ForEach {
944                f: f.clone(),
945                input: Box::new(input.deep_clone(seen_tees)),
946                op_metadata: op_metadata.clone(),
947            },
948            HydroRoot::SendExternal {
949                to_external_id,
950                to_key,
951                to_many,
952                unpaired,
953                serialize_fn,
954                instantiate_fn,
955                input,
956                op_metadata,
957            } => HydroRoot::SendExternal {
958                to_external_id: *to_external_id,
959                to_key: *to_key,
960                to_many: *to_many,
961                unpaired: *unpaired,
962                serialize_fn: serialize_fn.clone(),
963                instantiate_fn: instantiate_fn.clone(),
964                input: Box::new(input.deep_clone(seen_tees)),
965                op_metadata: op_metadata.clone(),
966            },
967            HydroRoot::DestSink {
968                sink,
969                input,
970                op_metadata,
971            } => HydroRoot::DestSink {
972                sink: sink.clone(),
973                input: Box::new(input.deep_clone(seen_tees)),
974                op_metadata: op_metadata.clone(),
975            },
976            HydroRoot::CycleSink {
977                ident,
978                input,
979                op_metadata,
980            } => HydroRoot::CycleSink {
981                ident: ident.clone(),
982                input: Box::new(input.deep_clone(seen_tees)),
983                op_metadata: op_metadata.clone(),
984            },
985        }
986    }
987
988    #[cfg(feature = "build")]
989    pub fn emit<'a, D: Deploy<'a>>(
990        &mut self,
991        graph_builders: &mut dyn DfirBuilder,
992        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
993        next_stmt_id: &mut usize,
994    ) {
995        self.emit_core::<D>(
996            &mut BuildersOrCallback::Builders::<
997                fn(&mut HydroRoot, &mut usize),
998                fn(&mut HydroNode, &mut usize),
999            >(graph_builders),
1000            built_tees,
1001            next_stmt_id,
1002        );
1003    }
1004
1005    #[cfg(feature = "build")]
1006    pub fn emit_core<'a, D: Deploy<'a>>(
1007        &mut self,
1008        builders_or_callback: &mut BuildersOrCallback<
1009            impl FnMut(&mut HydroRoot, &mut usize),
1010            impl FnMut(&mut HydroNode, &mut usize),
1011        >,
1012        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013        next_stmt_id: &mut usize,
1014    ) {
1015        match self {
1016            HydroRoot::ForEach { f, input, .. } => {
1017                let input_ident =
1018                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1019
1020                match builders_or_callback {
1021                    BuildersOrCallback::Builders(graph_builders) => {
1022                        graph_builders
1023                            .get_dfir_mut(&input.metadata().location_kind)
1024                            .add_dfir(
1025                                parse_quote! {
1026                                    #input_ident -> for_each(#f);
1027                                },
1028                                None,
1029                                Some(&next_stmt_id.to_string()),
1030                            );
1031                    }
1032                    BuildersOrCallback::Callback(leaf_callback, _) => {
1033                        leaf_callback(self, next_stmt_id);
1034                    }
1035                }
1036
1037                *next_stmt_id += 1;
1038            }
1039
1040            HydroRoot::SendExternal {
1041                serialize_fn,
1042                instantiate_fn,
1043                input,
1044                ..
1045            } => {
1046                let input_ident =
1047                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1048
1049                match builders_or_callback {
1050                    BuildersOrCallback::Builders(graph_builders) => {
1051                        let (sink_expr, _) = match instantiate_fn {
1052                            DebugInstantiate::Building => (
1053                                syn::parse_quote!(DUMMY_SINK),
1054                                syn::parse_quote!(DUMMY_SOURCE),
1055                            ),
1056
1057                            DebugInstantiate::Finalized(finalized) => {
1058                                (finalized.sink.clone(), finalized.source.clone())
1059                            }
1060                        };
1061
1062                        graph_builders.create_external_output(
1063                            &input.metadata().location_kind,
1064                            sink_expr,
1065                            &input_ident,
1066                            serialize_fn,
1067                            *next_stmt_id,
1068                        );
1069                    }
1070                    BuildersOrCallback::Callback(leaf_callback, _) => {
1071                        leaf_callback(self, next_stmt_id);
1072                    }
1073                }
1074
1075                *next_stmt_id += 1;
1076            }
1077
1078            HydroRoot::DestSink { sink, input, .. } => {
1079                let input_ident =
1080                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1081
1082                match builders_or_callback {
1083                    BuildersOrCallback::Builders(graph_builders) => {
1084                        graph_builders
1085                            .get_dfir_mut(&input.metadata().location_kind)
1086                            .add_dfir(
1087                                parse_quote! {
1088                                    #input_ident -> dest_sink(#sink);
1089                                },
1090                                None,
1091                                Some(&next_stmt_id.to_string()),
1092                            );
1093                    }
1094                    BuildersOrCallback::Callback(leaf_callback, _) => {
1095                        leaf_callback(self, next_stmt_id);
1096                    }
1097                }
1098
1099                *next_stmt_id += 1;
1100            }
1101
1102            HydroRoot::CycleSink { ident, input, .. } => {
1103                let input_ident =
1104                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1105
1106                match builders_or_callback {
1107                    BuildersOrCallback::Builders(graph_builders) => {
1108                        graph_builders
1109                            .get_dfir_mut(&input.metadata().location_kind)
1110                            .add_dfir(
1111                                parse_quote! {
1112                                    #ident = #input_ident;
1113                                },
1114                                None,
1115                                None,
1116                            );
1117                    }
1118                    // No ID, no callback
1119                    BuildersOrCallback::Callback(_, _) => {}
1120                }
1121            }
1122        }
1123    }
1124
1125    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1126        match self {
1127            HydroRoot::ForEach { op_metadata, .. }
1128            | HydroRoot::SendExternal { op_metadata, .. }
1129            | HydroRoot::DestSink { op_metadata, .. }
1130            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1131        }
1132    }
1133
1134    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1135        match self {
1136            HydroRoot::ForEach { op_metadata, .. }
1137            | HydroRoot::SendExternal { op_metadata, .. }
1138            | HydroRoot::DestSink { op_metadata, .. }
1139            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1140        }
1141    }
1142
1143    pub fn input(&self) -> &HydroNode {
1144        match self {
1145            HydroRoot::ForEach { input, .. }
1146            | HydroRoot::SendExternal { input, .. }
1147            | HydroRoot::DestSink { input, .. }
1148            | HydroRoot::CycleSink { input, .. } => input,
1149        }
1150    }
1151
1152    pub fn input_metadata(&self) -> &HydroIrMetadata {
1153        self.input().metadata()
1154    }
1155
1156    pub fn print_root(&self) -> String {
1157        match self {
1158            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1159            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1160            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1161            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1162        }
1163    }
1164
1165    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1166        match self {
1167            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1168                transform(f);
1169            }
1170            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1171        }
1172    }
1173}
1174
1175#[cfg(feature = "build")]
1176pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1177    let mut builders = BTreeMap::new();
1178    let mut built_tees = HashMap::new();
1179    let mut next_stmt_id = 0;
1180    for leaf in ir {
1181        leaf.emit::<D>(&mut builders, &mut built_tees, &mut next_stmt_id);
1182    }
1183    builders
1184}
1185
1186#[cfg(feature = "build")]
1187pub fn traverse_dfir<'a, D: Deploy<'a>>(
1188    ir: &mut [HydroRoot],
1189    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1190    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1191) {
1192    let mut seen_tees = HashMap::new();
1193    let mut next_stmt_id = 0;
1194    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1195    ir.iter_mut().for_each(|leaf| {
1196        leaf.emit_core::<D>(&mut callback, &mut seen_tees, &mut next_stmt_id);
1197    });
1198}
1199
1200pub fn transform_bottom_up(
1201    ir: &mut [HydroRoot],
1202    transform_root: &mut impl FnMut(&mut HydroRoot),
1203    transform_node: &mut impl FnMut(&mut HydroNode),
1204    check_well_formed: bool,
1205) {
1206    let mut seen_tees = HashMap::new();
1207    ir.iter_mut().for_each(|leaf| {
1208        leaf.transform_bottom_up(
1209            transform_root,
1210            transform_node,
1211            &mut seen_tees,
1212            check_well_formed,
1213        );
1214    });
1215}
1216
1217pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1218    let mut seen_tees = HashMap::new();
1219    ir.iter()
1220        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1221        .collect()
1222}
1223
1224type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1225thread_local! {
1226    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1227}
1228
1229pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1230    PRINTED_TEES.with(|printed_tees| {
1231        let mut printed_tees_mut = printed_tees.borrow_mut();
1232        *printed_tees_mut = Some((0, HashMap::new()));
1233        drop(printed_tees_mut);
1234
1235        let ret = f();
1236
1237        let mut printed_tees_mut = printed_tees.borrow_mut();
1238        *printed_tees_mut = None;
1239
1240        ret
1241    })
1242}
1243
1244pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1245
1246impl TeeNode {
1247    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1248        Rc::as_ptr(&self.0)
1249    }
1250}
1251
1252impl Debug for TeeNode {
1253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1254        PRINTED_TEES.with(|printed_tees| {
1255            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1256            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1257
1258            if let Some(printed_tees_mut) = printed_tees_mut {
1259                if let Some(existing) = printed_tees_mut
1260                    .1
1261                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1262                {
1263                    write!(f, "<tee {}>", existing)
1264                } else {
1265                    let next_id = printed_tees_mut.0;
1266                    printed_tees_mut.0 += 1;
1267                    printed_tees_mut
1268                        .1
1269                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1270                    drop(printed_tees_mut_borrow);
1271                    write!(f, "<tee {}>: ", next_id)?;
1272                    Debug::fmt(&self.0.borrow(), f)
1273                }
1274            } else {
1275                drop(printed_tees_mut_borrow);
1276                write!(f, "<tee>: ")?;
1277                Debug::fmt(&self.0.borrow(), f)
1278            }
1279        })
1280    }
1281}
1282
1283impl Hash for TeeNode {
1284    fn hash<H: Hasher>(&self, state: &mut H) {
1285        self.0.borrow_mut().hash(state);
1286    }
1287}
1288
1289#[derive(Clone, PartialEq, Eq, Debug)]
1290pub enum BoundKind {
1291    Unbounded,
1292    Bounded,
1293}
1294
1295#[derive(Clone, PartialEq, Eq, Debug)]
1296pub enum StreamOrder {
1297    NoOrder,
1298    TotalOrder,
1299}
1300
1301#[derive(Clone, PartialEq, Eq, Debug)]
1302pub enum StreamRetry {
1303    AtLeastOnce,
1304    ExactlyOnce,
1305}
1306
1307#[derive(Clone, PartialEq, Eq, Debug)]
1308pub enum KeyedSingletonBoundKind {
1309    Unbounded,
1310    BoundedValue,
1311    Bounded,
1312}
1313
1314#[derive(Clone, PartialEq, Eq, Debug)]
1315pub enum CollectionKind {
1316    Stream {
1317        bound: BoundKind,
1318        order: StreamOrder,
1319        retry: StreamRetry,
1320        element_type: DebugType,
1321    },
1322    Singleton {
1323        bound: BoundKind,
1324        element_type: DebugType,
1325    },
1326    Optional {
1327        bound: BoundKind,
1328        element_type: DebugType,
1329    },
1330    KeyedStream {
1331        bound: BoundKind,
1332        value_order: StreamOrder,
1333        value_retry: StreamRetry,
1334        key_type: DebugType,
1335        value_type: DebugType,
1336    },
1337    KeyedSingleton {
1338        bound: KeyedSingletonBoundKind,
1339        key_type: DebugType,
1340        value_type: DebugType,
1341    },
1342}
1343
1344#[derive(Clone)]
1345pub struct HydroIrMetadata {
1346    pub location_kind: LocationId,
1347    pub collection_kind: CollectionKind,
1348    pub cardinality: Option<usize>,
1349    pub tag: Option<String>,
1350    pub op: HydroIrOpMetadata,
1351}
1352
1353// HydroIrMetadata shouldn't be used to hash or compare
1354impl Hash for HydroIrMetadata {
1355    fn hash<H: Hasher>(&self, _: &mut H) {}
1356}
1357
1358impl PartialEq for HydroIrMetadata {
1359    fn eq(&self, _: &Self) -> bool {
1360        true
1361    }
1362}
1363
1364impl Eq for HydroIrMetadata {}
1365
1366impl Debug for HydroIrMetadata {
1367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1368        f.debug_struct("HydroIrMetadata")
1369            .field("location_kind", &self.location_kind)
1370            .field("collection_kind", &self.collection_kind)
1371            .finish()
1372    }
1373}
1374
1375/// Metadata that is specific to the operator itself, rather than its outputs.
1376/// This is available on _both_ inner nodes and roots.
1377#[derive(Clone)]
1378pub struct HydroIrOpMetadata {
1379    pub backtrace: Backtrace,
1380    pub cpu_usage: Option<f64>,
1381    pub network_recv_cpu_usage: Option<f64>,
1382    pub id: Option<usize>,
1383}
1384
1385impl HydroIrOpMetadata {
1386    #[expect(
1387        clippy::new_without_default,
1388        reason = "explicit calls to new ensure correct backtrace bounds"
1389    )]
1390    pub fn new() -> HydroIrOpMetadata {
1391        Self::new_with_skip(1)
1392    }
1393
1394    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1395        HydroIrOpMetadata {
1396            backtrace: Backtrace::get_backtrace(2 + skip_count),
1397            cpu_usage: None,
1398            network_recv_cpu_usage: None,
1399            id: None,
1400        }
1401    }
1402}
1403
1404impl Debug for HydroIrOpMetadata {
1405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1406        f.debug_struct("HydroIrOpMetadata").finish()
1407    }
1408}
1409
1410impl Hash for HydroIrOpMetadata {
1411    fn hash<H: Hasher>(&self, _: &mut H) {}
1412}
1413
1414/// An intermediate node in a Hydro graph, which consumes data
1415/// from upstream nodes and emits data to downstream nodes.
1416#[derive(Debug, Hash)]
1417pub enum HydroNode {
1418    Placeholder,
1419
1420    /// Manually "casts" between two different collection kinds.
1421    ///
1422    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1423    /// correctness checks. In particular, the user must ensure that every possible
1424    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1425    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1426    /// collection. This ensures that the simulator does not miss any possible outputs.
1427    Cast {
1428        inner: Box<HydroNode>,
1429        metadata: HydroIrMetadata,
1430    },
1431
1432    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1433    /// interpretation of the input stream.
1434    ///
1435    /// In production, this simply passes through the input, but in simulation, this operator
1436    /// explicitly selects a randomized interpretation.
1437    ObserveNonDet {
1438        inner: Box<HydroNode>,
1439        trusted: bool, // if true, we do not need to simulate non-determinism
1440        metadata: HydroIrMetadata,
1441    },
1442
1443    Source {
1444        source: HydroSource,
1445        metadata: HydroIrMetadata,
1446    },
1447
1448    SingletonSource {
1449        value: DebugExpr,
1450        metadata: HydroIrMetadata,
1451    },
1452
1453    CycleSource {
1454        ident: syn::Ident,
1455        metadata: HydroIrMetadata,
1456    },
1457
1458    Tee {
1459        inner: TeeNode,
1460        metadata: HydroIrMetadata,
1461    },
1462
1463    BeginAtomic {
1464        inner: Box<HydroNode>,
1465        metadata: HydroIrMetadata,
1466    },
1467
1468    EndAtomic {
1469        inner: Box<HydroNode>,
1470        metadata: HydroIrMetadata,
1471    },
1472
1473    Batch {
1474        inner: Box<HydroNode>,
1475        metadata: HydroIrMetadata,
1476    },
1477
1478    YieldConcat {
1479        inner: Box<HydroNode>,
1480        metadata: HydroIrMetadata,
1481    },
1482
1483    Chain {
1484        first: Box<HydroNode>,
1485        second: Box<HydroNode>,
1486        metadata: HydroIrMetadata,
1487    },
1488
1489    ChainFirst {
1490        first: Box<HydroNode>,
1491        second: Box<HydroNode>,
1492        metadata: HydroIrMetadata,
1493    },
1494
1495    CrossProduct {
1496        left: Box<HydroNode>,
1497        right: Box<HydroNode>,
1498        metadata: HydroIrMetadata,
1499    },
1500
1501    CrossSingleton {
1502        left: Box<HydroNode>,
1503        right: Box<HydroNode>,
1504        metadata: HydroIrMetadata,
1505    },
1506
1507    Join {
1508        left: Box<HydroNode>,
1509        right: Box<HydroNode>,
1510        metadata: HydroIrMetadata,
1511    },
1512
1513    Difference {
1514        pos: Box<HydroNode>,
1515        neg: Box<HydroNode>,
1516        metadata: HydroIrMetadata,
1517    },
1518
1519    AntiJoin {
1520        pos: Box<HydroNode>,
1521        neg: Box<HydroNode>,
1522        metadata: HydroIrMetadata,
1523    },
1524
1525    ResolveFutures {
1526        input: Box<HydroNode>,
1527        metadata: HydroIrMetadata,
1528    },
1529    ResolveFuturesOrdered {
1530        input: Box<HydroNode>,
1531        metadata: HydroIrMetadata,
1532    },
1533
1534    Map {
1535        f: DebugExpr,
1536        input: Box<HydroNode>,
1537        metadata: HydroIrMetadata,
1538    },
1539    FlatMap {
1540        f: DebugExpr,
1541        input: Box<HydroNode>,
1542        metadata: HydroIrMetadata,
1543    },
1544    Filter {
1545        f: DebugExpr,
1546        input: Box<HydroNode>,
1547        metadata: HydroIrMetadata,
1548    },
1549    FilterMap {
1550        f: DebugExpr,
1551        input: Box<HydroNode>,
1552        metadata: HydroIrMetadata,
1553    },
1554
1555    DeferTick {
1556        input: Box<HydroNode>,
1557        metadata: HydroIrMetadata,
1558    },
1559    Enumerate {
1560        input: Box<HydroNode>,
1561        metadata: HydroIrMetadata,
1562    },
1563    Inspect {
1564        f: DebugExpr,
1565        input: Box<HydroNode>,
1566        metadata: HydroIrMetadata,
1567    },
1568
1569    Unique {
1570        input: Box<HydroNode>,
1571        metadata: HydroIrMetadata,
1572    },
1573
1574    Sort {
1575        input: Box<HydroNode>,
1576        metadata: HydroIrMetadata,
1577    },
1578    Fold {
1579        init: DebugExpr,
1580        acc: DebugExpr,
1581        input: Box<HydroNode>,
1582        metadata: HydroIrMetadata,
1583    },
1584
1585    Scan {
1586        init: DebugExpr,
1587        acc: DebugExpr,
1588        input: Box<HydroNode>,
1589        metadata: HydroIrMetadata,
1590    },
1591    FoldKeyed {
1592        init: DebugExpr,
1593        acc: DebugExpr,
1594        input: Box<HydroNode>,
1595        metadata: HydroIrMetadata,
1596    },
1597
1598    Reduce {
1599        f: DebugExpr,
1600        input: Box<HydroNode>,
1601        metadata: HydroIrMetadata,
1602    },
1603    ReduceKeyed {
1604        f: DebugExpr,
1605        input: Box<HydroNode>,
1606        metadata: HydroIrMetadata,
1607    },
1608    ReduceKeyedWatermark {
1609        f: DebugExpr,
1610        input: Box<HydroNode>,
1611        watermark: Box<HydroNode>,
1612        metadata: HydroIrMetadata,
1613    },
1614
1615    Network {
1616        serialize_fn: Option<DebugExpr>,
1617        instantiate_fn: DebugInstantiate,
1618        deserialize_fn: Option<DebugExpr>,
1619        input: Box<HydroNode>,
1620        metadata: HydroIrMetadata,
1621    },
1622
1623    ExternalInput {
1624        from_external_id: usize,
1625        from_key: usize,
1626        from_many: bool,
1627        codec_type: DebugType,
1628        port_hint: NetworkHint,
1629        instantiate_fn: DebugInstantiate,
1630        deserialize_fn: Option<DebugExpr>,
1631        metadata: HydroIrMetadata,
1632    },
1633
1634    Counter {
1635        tag: String,
1636        duration: DebugExpr,
1637        prefix: String,
1638        input: Box<HydroNode>,
1639        metadata: HydroIrMetadata,
1640    },
1641}
1642
1643pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1644pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1645
1646impl HydroNode {
1647    pub fn transform_bottom_up(
1648        &mut self,
1649        transform: &mut impl FnMut(&mut HydroNode),
1650        seen_tees: &mut SeenTees,
1651        check_well_formed: bool,
1652    ) {
1653        self.transform_children(
1654            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1655            seen_tees,
1656        );
1657
1658        transform(self);
1659
1660        let self_location = self.metadata().location_kind.root();
1661
1662        if check_well_formed {
1663            match &*self {
1664                HydroNode::Network { .. } => {}
1665                _ => {
1666                    self.input_metadata().iter().for_each(|i| {
1667                        if i.location_kind.root() != self_location {
1668                            panic!(
1669                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1670                                i,
1671                                i.location_kind.root(),
1672                                self,
1673                                self_location
1674                            )
1675                        }
1676                    });
1677                }
1678            }
1679        }
1680    }
1681
1682    #[inline(always)]
1683    pub fn transform_children(
1684        &mut self,
1685        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1686        seen_tees: &mut SeenTees,
1687    ) {
1688        match self {
1689            HydroNode::Placeholder => {
1690                panic!();
1691            }
1692
1693            HydroNode::Source { .. }
1694            | HydroNode::SingletonSource { .. }
1695            | HydroNode::CycleSource { .. }
1696            | HydroNode::ExternalInput { .. } => {}
1697
1698            HydroNode::Tee { inner, .. } => {
1699                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1700                    *inner = TeeNode(transformed.clone());
1701                } else {
1702                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1703                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1704                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1705                    transform(&mut orig, seen_tees);
1706                    *transformed_cell.borrow_mut() = orig;
1707                    *inner = TeeNode(transformed_cell);
1708                }
1709            }
1710
1711            HydroNode::Cast { inner, .. }
1712            | HydroNode::ObserveNonDet { inner, .. }
1713            | HydroNode::BeginAtomic { inner, .. }
1714            | HydroNode::EndAtomic { inner, .. }
1715            | HydroNode::Batch { inner, .. }
1716            | HydroNode::YieldConcat { inner, .. } => {
1717                transform(inner.as_mut(), seen_tees);
1718            }
1719
1720            HydroNode::Chain { first, second, .. } => {
1721                transform(first.as_mut(), seen_tees);
1722                transform(second.as_mut(), seen_tees);
1723            }
1724
1725            HydroNode::ChainFirst { first, second, .. } => {
1726                transform(first.as_mut(), seen_tees);
1727                transform(second.as_mut(), seen_tees);
1728            }
1729
1730            HydroNode::CrossSingleton { left, right, .. }
1731            | HydroNode::CrossProduct { left, right, .. }
1732            | HydroNode::Join { left, right, .. } => {
1733                transform(left.as_mut(), seen_tees);
1734                transform(right.as_mut(), seen_tees);
1735            }
1736
1737            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1738                transform(pos.as_mut(), seen_tees);
1739                transform(neg.as_mut(), seen_tees);
1740            }
1741
1742            HydroNode::ReduceKeyedWatermark {
1743                input, watermark, ..
1744            } => {
1745                transform(input.as_mut(), seen_tees);
1746                transform(watermark.as_mut(), seen_tees);
1747            }
1748
1749            HydroNode::Map { input, .. }
1750            | HydroNode::ResolveFutures { input, .. }
1751            | HydroNode::ResolveFuturesOrdered { input, .. }
1752            | HydroNode::FlatMap { input, .. }
1753            | HydroNode::Filter { input, .. }
1754            | HydroNode::FilterMap { input, .. }
1755            | HydroNode::Sort { input, .. }
1756            | HydroNode::DeferTick { input, .. }
1757            | HydroNode::Enumerate { input, .. }
1758            | HydroNode::Inspect { input, .. }
1759            | HydroNode::Unique { input, .. }
1760            | HydroNode::Network { input, .. }
1761            | HydroNode::Fold { input, .. }
1762            | HydroNode::Scan { input, .. }
1763            | HydroNode::FoldKeyed { input, .. }
1764            | HydroNode::Reduce { input, .. }
1765            | HydroNode::ReduceKeyed { input, .. }
1766            | HydroNode::Counter { input, .. } => {
1767                transform(input.as_mut(), seen_tees);
1768            }
1769        }
1770    }
1771
1772    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1773        match self {
1774            HydroNode::Placeholder => HydroNode::Placeholder,
1775            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1776                inner: Box::new(inner.deep_clone(seen_tees)),
1777                metadata: metadata.clone(),
1778            },
1779            HydroNode::ObserveNonDet {
1780                inner,
1781                trusted,
1782                metadata,
1783            } => HydroNode::ObserveNonDet {
1784                inner: Box::new(inner.deep_clone(seen_tees)),
1785                trusted: *trusted,
1786                metadata: metadata.clone(),
1787            },
1788            HydroNode::Source { source, metadata } => HydroNode::Source {
1789                source: source.clone(),
1790                metadata: metadata.clone(),
1791            },
1792            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1793                value: value.clone(),
1794                metadata: metadata.clone(),
1795            },
1796            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1797                ident: ident.clone(),
1798                metadata: metadata.clone(),
1799            },
1800            HydroNode::Tee { inner, metadata } => {
1801                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1802                    HydroNode::Tee {
1803                        inner: TeeNode(transformed.clone()),
1804                        metadata: metadata.clone(),
1805                    }
1806                } else {
1807                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1808                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1809                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1810                    *new_rc.borrow_mut() = cloned;
1811                    HydroNode::Tee {
1812                        inner: TeeNode(new_rc),
1813                        metadata: metadata.clone(),
1814                    }
1815                }
1816            }
1817            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1818                inner: Box::new(inner.deep_clone(seen_tees)),
1819                metadata: metadata.clone(),
1820            },
1821            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1822                inner: Box::new(inner.deep_clone(seen_tees)),
1823                metadata: metadata.clone(),
1824            },
1825            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1826                inner: Box::new(inner.deep_clone(seen_tees)),
1827                metadata: metadata.clone(),
1828            },
1829            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1830                inner: Box::new(inner.deep_clone(seen_tees)),
1831                metadata: metadata.clone(),
1832            },
1833            HydroNode::Chain {
1834                first,
1835                second,
1836                metadata,
1837            } => HydroNode::Chain {
1838                first: Box::new(first.deep_clone(seen_tees)),
1839                second: Box::new(second.deep_clone(seen_tees)),
1840                metadata: metadata.clone(),
1841            },
1842            HydroNode::ChainFirst {
1843                first,
1844                second,
1845                metadata,
1846            } => HydroNode::ChainFirst {
1847                first: Box::new(first.deep_clone(seen_tees)),
1848                second: Box::new(second.deep_clone(seen_tees)),
1849                metadata: metadata.clone(),
1850            },
1851            HydroNode::CrossProduct {
1852                left,
1853                right,
1854                metadata,
1855            } => HydroNode::CrossProduct {
1856                left: Box::new(left.deep_clone(seen_tees)),
1857                right: Box::new(right.deep_clone(seen_tees)),
1858                metadata: metadata.clone(),
1859            },
1860            HydroNode::CrossSingleton {
1861                left,
1862                right,
1863                metadata,
1864            } => HydroNode::CrossSingleton {
1865                left: Box::new(left.deep_clone(seen_tees)),
1866                right: Box::new(right.deep_clone(seen_tees)),
1867                metadata: metadata.clone(),
1868            },
1869            HydroNode::Join {
1870                left,
1871                right,
1872                metadata,
1873            } => HydroNode::Join {
1874                left: Box::new(left.deep_clone(seen_tees)),
1875                right: Box::new(right.deep_clone(seen_tees)),
1876                metadata: metadata.clone(),
1877            },
1878            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1879                pos: Box::new(pos.deep_clone(seen_tees)),
1880                neg: Box::new(neg.deep_clone(seen_tees)),
1881                metadata: metadata.clone(),
1882            },
1883            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1884                pos: Box::new(pos.deep_clone(seen_tees)),
1885                neg: Box::new(neg.deep_clone(seen_tees)),
1886                metadata: metadata.clone(),
1887            },
1888            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1889                input: Box::new(input.deep_clone(seen_tees)),
1890                metadata: metadata.clone(),
1891            },
1892            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1893                HydroNode::ResolveFuturesOrdered {
1894                    input: Box::new(input.deep_clone(seen_tees)),
1895                    metadata: metadata.clone(),
1896                }
1897            }
1898            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1899                f: f.clone(),
1900                input: Box::new(input.deep_clone(seen_tees)),
1901                metadata: metadata.clone(),
1902            },
1903            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1904                f: f.clone(),
1905                input: Box::new(input.deep_clone(seen_tees)),
1906                metadata: metadata.clone(),
1907            },
1908            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1909                f: f.clone(),
1910                input: Box::new(input.deep_clone(seen_tees)),
1911                metadata: metadata.clone(),
1912            },
1913            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1914                f: f.clone(),
1915                input: Box::new(input.deep_clone(seen_tees)),
1916                metadata: metadata.clone(),
1917            },
1918            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1919                input: Box::new(input.deep_clone(seen_tees)),
1920                metadata: metadata.clone(),
1921            },
1922            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1923                input: Box::new(input.deep_clone(seen_tees)),
1924                metadata: metadata.clone(),
1925            },
1926            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1927                f: f.clone(),
1928                input: Box::new(input.deep_clone(seen_tees)),
1929                metadata: metadata.clone(),
1930            },
1931            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1932                input: Box::new(input.deep_clone(seen_tees)),
1933                metadata: metadata.clone(),
1934            },
1935            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1936                input: Box::new(input.deep_clone(seen_tees)),
1937                metadata: metadata.clone(),
1938            },
1939            HydroNode::Fold {
1940                init,
1941                acc,
1942                input,
1943                metadata,
1944            } => HydroNode::Fold {
1945                init: init.clone(),
1946                acc: acc.clone(),
1947                input: Box::new(input.deep_clone(seen_tees)),
1948                metadata: metadata.clone(),
1949            },
1950            HydroNode::Scan {
1951                init,
1952                acc,
1953                input,
1954                metadata,
1955            } => HydroNode::Scan {
1956                init: init.clone(),
1957                acc: acc.clone(),
1958                input: Box::new(input.deep_clone(seen_tees)),
1959                metadata: metadata.clone(),
1960            },
1961            HydroNode::FoldKeyed {
1962                init,
1963                acc,
1964                input,
1965                metadata,
1966            } => HydroNode::FoldKeyed {
1967                init: init.clone(),
1968                acc: acc.clone(),
1969                input: Box::new(input.deep_clone(seen_tees)),
1970                metadata: metadata.clone(),
1971            },
1972            HydroNode::ReduceKeyedWatermark {
1973                f,
1974                input,
1975                watermark,
1976                metadata,
1977            } => HydroNode::ReduceKeyedWatermark {
1978                f: f.clone(),
1979                input: Box::new(input.deep_clone(seen_tees)),
1980                watermark: Box::new(watermark.deep_clone(seen_tees)),
1981                metadata: metadata.clone(),
1982            },
1983            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1984                f: f.clone(),
1985                input: Box::new(input.deep_clone(seen_tees)),
1986                metadata: metadata.clone(),
1987            },
1988            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1989                f: f.clone(),
1990                input: Box::new(input.deep_clone(seen_tees)),
1991                metadata: metadata.clone(),
1992            },
1993            HydroNode::Network {
1994                serialize_fn,
1995                instantiate_fn,
1996                deserialize_fn,
1997                input,
1998                metadata,
1999            } => HydroNode::Network {
2000                serialize_fn: serialize_fn.clone(),
2001                instantiate_fn: instantiate_fn.clone(),
2002                deserialize_fn: deserialize_fn.clone(),
2003                input: Box::new(input.deep_clone(seen_tees)),
2004                metadata: metadata.clone(),
2005            },
2006            HydroNode::ExternalInput {
2007                from_external_id,
2008                from_key,
2009                from_many,
2010                codec_type,
2011                port_hint,
2012                instantiate_fn,
2013                deserialize_fn,
2014                metadata,
2015            } => HydroNode::ExternalInput {
2016                from_external_id: *from_external_id,
2017                from_key: *from_key,
2018                from_many: *from_many,
2019                codec_type: codec_type.clone(),
2020                port_hint: *port_hint,
2021                instantiate_fn: instantiate_fn.clone(),
2022                deserialize_fn: deserialize_fn.clone(),
2023                metadata: metadata.clone(),
2024            },
2025            HydroNode::Counter {
2026                tag,
2027                duration,
2028                prefix,
2029                input,
2030                metadata,
2031            } => HydroNode::Counter {
2032                tag: tag.clone(),
2033                duration: duration.clone(),
2034                prefix: prefix.clone(),
2035                input: Box::new(input.deep_clone(seen_tees)),
2036                metadata: metadata.clone(),
2037            },
2038        }
2039    }
2040
2041    #[cfg(feature = "build")]
2042    pub fn emit_core<'a, D: Deploy<'a>>(
2043        &mut self,
2044        builders_or_callback: &mut BuildersOrCallback<
2045            impl FnMut(&mut HydroRoot, &mut usize),
2046            impl FnMut(&mut HydroNode, &mut usize),
2047        >,
2048        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2049        next_stmt_id: &mut usize,
2050    ) -> syn::Ident {
2051        let out_location = self.metadata().location_kind.clone();
2052        match self {
2053            HydroNode::Placeholder => {
2054                panic!()
2055            }
2056
2057            HydroNode::Cast { inner, .. } => {
2058                let inner_ident =
2059                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2060
2061                match builders_or_callback {
2062                    BuildersOrCallback::Builders(_) => {}
2063                    BuildersOrCallback::Callback(_, node_callback) => {
2064                        node_callback(self, next_stmt_id);
2065                    }
2066                }
2067
2068                *next_stmt_id += 1;
2069
2070                inner_ident
2071            }
2072
2073            HydroNode::ObserveNonDet {
2074                inner,
2075                trusted,
2076                metadata,
2077                ..
2078            } => {
2079                let inner_ident =
2080                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2081
2082                let observe_ident =
2083                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2084
2085                match builders_or_callback {
2086                    BuildersOrCallback::Builders(graph_builders) => {
2087                        graph_builders.observe_nondet(
2088                            *trusted,
2089                            &inner.metadata().location_kind,
2090                            inner_ident,
2091                            &inner.metadata().collection_kind,
2092                            &observe_ident,
2093                            &metadata.collection_kind,
2094                            &metadata.op,
2095                        );
2096                    }
2097                    BuildersOrCallback::Callback(_, node_callback) => {
2098                        node_callback(self, next_stmt_id);
2099                    }
2100                }
2101
2102                *next_stmt_id += 1;
2103
2104                observe_ident
2105            }
2106
2107            HydroNode::Batch {
2108                inner, metadata, ..
2109            } => {
2110                let inner_ident =
2111                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2112
2113                let batch_ident =
2114                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2115
2116                match builders_or_callback {
2117                    BuildersOrCallback::Builders(graph_builders) => {
2118                        graph_builders.batch(
2119                            inner_ident,
2120                            &inner.metadata().location_kind,
2121                            &inner.metadata().collection_kind,
2122                            &batch_ident,
2123                            &out_location,
2124                            &metadata.op,
2125                        );
2126                    }
2127                    BuildersOrCallback::Callback(_, node_callback) => {
2128                        node_callback(self, next_stmt_id);
2129                    }
2130                }
2131
2132                *next_stmt_id += 1;
2133
2134                batch_ident
2135            }
2136
2137            HydroNode::YieldConcat { inner, .. } => {
2138                let inner_ident =
2139                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2140
2141                let yield_ident =
2142                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2143
2144                match builders_or_callback {
2145                    BuildersOrCallback::Builders(graph_builders) => {
2146                        graph_builders.yield_from_tick(
2147                            inner_ident,
2148                            &inner.metadata().location_kind,
2149                            &inner.metadata().collection_kind,
2150                            &yield_ident,
2151                            &out_location,
2152                        );
2153                    }
2154                    BuildersOrCallback::Callback(_, node_callback) => {
2155                        node_callback(self, next_stmt_id);
2156                    }
2157                }
2158
2159                *next_stmt_id += 1;
2160
2161                yield_ident
2162            }
2163
2164            HydroNode::BeginAtomic { inner, metadata } => {
2165                let inner_ident =
2166                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2167
2168                let begin_ident =
2169                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2170
2171                match builders_or_callback {
2172                    BuildersOrCallback::Builders(graph_builders) => {
2173                        graph_builders.begin_atomic(
2174                            inner_ident,
2175                            &inner.metadata().location_kind,
2176                            &inner.metadata().collection_kind,
2177                            &begin_ident,
2178                            &out_location,
2179                            &metadata.op,
2180                        );
2181                    }
2182                    BuildersOrCallback::Callback(_, node_callback) => {
2183                        node_callback(self, next_stmt_id);
2184                    }
2185                }
2186
2187                *next_stmt_id += 1;
2188
2189                begin_ident
2190            }
2191
2192            HydroNode::EndAtomic { inner, .. } => {
2193                let inner_ident =
2194                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2195
2196                let end_ident =
2197                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2198
2199                match builders_or_callback {
2200                    BuildersOrCallback::Builders(graph_builders) => {
2201                        graph_builders.end_atomic(
2202                            inner_ident,
2203                            &inner.metadata().location_kind,
2204                            &inner.metadata().collection_kind,
2205                            &end_ident,
2206                        );
2207                    }
2208                    BuildersOrCallback::Callback(_, node_callback) => {
2209                        node_callback(self, next_stmt_id);
2210                    }
2211                }
2212
2213                *next_stmt_id += 1;
2214
2215                end_ident
2216            }
2217
2218            HydroNode::Source {
2219                source, metadata, ..
2220            } => {
2221                if let HydroSource::ExternalNetwork() = source {
2222                    syn::Ident::new("DUMMY", Span::call_site())
2223                } else {
2224                    let source_ident =
2225                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2226
2227                    let source_stmt = match source {
2228                        HydroSource::Stream(expr) => {
2229                            debug_assert!(metadata.location_kind.is_top_level());
2230                            parse_quote! {
2231                                #source_ident = source_stream(#expr);
2232                            }
2233                        }
2234
2235                        HydroSource::ExternalNetwork() => {
2236                            unreachable!()
2237                        }
2238
2239                        HydroSource::Iter(expr) => {
2240                            if metadata.location_kind.is_top_level() {
2241                                parse_quote! {
2242                                    #source_ident = source_iter(#expr);
2243                                }
2244                            } else {
2245                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2246                                parse_quote! {
2247                                    #source_ident = source_iter(#expr) -> persist::<'static>();
2248                                }
2249                            }
2250                        }
2251
2252                        HydroSource::Spin() => {
2253                            debug_assert!(metadata.location_kind.is_top_level());
2254                            parse_quote! {
2255                                #source_ident = spin();
2256                            }
2257                        }
2258
2259                        HydroSource::ClusterMembers(location_id) => {
2260                            debug_assert!(metadata.location_kind.is_top_level());
2261
2262                            let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2263                                D::cluster_membership_stream(location_id),
2264                                &(),
2265                            );
2266
2267                            parse_quote! {
2268                                #source_ident = source_stream(#expr);
2269                            }
2270                        }
2271                    };
2272
2273                    match builders_or_callback {
2274                        BuildersOrCallback::Builders(graph_builders) => {
2275                            let builder = graph_builders.get_dfir_mut(&out_location);
2276                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2277                        }
2278                        BuildersOrCallback::Callback(_, node_callback) => {
2279                            node_callback(self, next_stmt_id);
2280                        }
2281                    }
2282
2283                    *next_stmt_id += 1;
2284
2285                    source_ident
2286                }
2287            }
2288
2289            HydroNode::SingletonSource { value, metadata } => {
2290                let source_ident =
2291                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2292
2293                match builders_or_callback {
2294                    BuildersOrCallback::Builders(graph_builders) => {
2295                        let should_replay = !graph_builders.singleton_intermediates();
2296                        let builder = graph_builders.get_dfir_mut(&out_location);
2297
2298                        if should_replay || !metadata.location_kind.is_top_level() {
2299                            builder.add_dfir(
2300                                parse_quote! {
2301                                    #source_ident = source_iter([#value]) -> persist::<'static>();
2302                                },
2303                                None,
2304                                Some(&next_stmt_id.to_string()),
2305                            );
2306                        } else {
2307                            builder.add_dfir(
2308                                parse_quote! {
2309                                    #source_ident = source_iter([#value]);
2310                                },
2311                                None,
2312                                Some(&next_stmt_id.to_string()),
2313                            );
2314                        }
2315                    }
2316                    BuildersOrCallback::Callback(_, node_callback) => {
2317                        node_callback(self, next_stmt_id);
2318                    }
2319                }
2320
2321                *next_stmt_id += 1;
2322
2323                source_ident
2324            }
2325
2326            HydroNode::CycleSource { ident, .. } => {
2327                let ident = ident.clone();
2328
2329                match builders_or_callback {
2330                    BuildersOrCallback::Builders(_) => {}
2331                    BuildersOrCallback::Callback(_, node_callback) => {
2332                        node_callback(self, next_stmt_id);
2333                    }
2334                }
2335
2336                // consume a stmt id even though we did not emit anything so that we can instrument this
2337                *next_stmt_id += 1;
2338
2339                ident
2340            }
2341
2342            HydroNode::Tee { inner, .. } => {
2343                let ret_ident = if let Some(teed_from) =
2344                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2345                {
2346                    match builders_or_callback {
2347                        BuildersOrCallback::Builders(_) => {}
2348                        BuildersOrCallback::Callback(_, node_callback) => {
2349                            node_callback(self, next_stmt_id);
2350                        }
2351                    }
2352
2353                    teed_from.clone()
2354                } else {
2355                    let inner_ident = inner.0.borrow_mut().emit_core::<D>(
2356                        builders_or_callback,
2357                        built_tees,
2358                        next_stmt_id,
2359                    );
2360
2361                    let tee_ident =
2362                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2363
2364                    built_tees.insert(
2365                        inner.0.as_ref() as *const RefCell<HydroNode>,
2366                        tee_ident.clone(),
2367                    );
2368
2369                    match builders_or_callback {
2370                        BuildersOrCallback::Builders(graph_builders) => {
2371                            let builder = graph_builders.get_dfir_mut(&out_location);
2372                            builder.add_dfir(
2373                                parse_quote! {
2374                                    #tee_ident = #inner_ident -> tee();
2375                                },
2376                                None,
2377                                Some(&next_stmt_id.to_string()),
2378                            );
2379                        }
2380                        BuildersOrCallback::Callback(_, node_callback) => {
2381                            node_callback(self, next_stmt_id);
2382                        }
2383                    }
2384
2385                    tee_ident
2386                };
2387
2388                // we consume a stmt id regardless of if we emit the tee() operator,
2389                // so that during rewrites we touch all recipients of the tee()
2390
2391                *next_stmt_id += 1;
2392                ret_ident
2393            }
2394
2395            HydroNode::Chain { first, second, .. } => {
2396                let first_ident =
2397                    first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2398                let second_ident =
2399                    second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2400
2401                let chain_ident =
2402                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2403
2404                match builders_or_callback {
2405                    BuildersOrCallback::Builders(graph_builders) => {
2406                        let builder = graph_builders.get_dfir_mut(&out_location);
2407                        builder.add_dfir(
2408                            parse_quote! {
2409                                #chain_ident = chain();
2410                                #first_ident -> [0]#chain_ident;
2411                                #second_ident -> [1]#chain_ident;
2412                            },
2413                            None,
2414                            Some(&next_stmt_id.to_string()),
2415                        );
2416                    }
2417                    BuildersOrCallback::Callback(_, node_callback) => {
2418                        node_callback(self, next_stmt_id);
2419                    }
2420                }
2421
2422                *next_stmt_id += 1;
2423
2424                chain_ident
2425            }
2426
2427            HydroNode::ChainFirst { first, second, .. } => {
2428                let first_ident =
2429                    first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2430                let second_ident =
2431                    second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2432
2433                let chain_ident =
2434                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2435
2436                match builders_or_callback {
2437                    BuildersOrCallback::Builders(graph_builders) => {
2438                        let builder = graph_builders.get_dfir_mut(&out_location);
2439                        builder.add_dfir(
2440                            parse_quote! {
2441                                #chain_ident = chain_first_n(1);
2442                                #first_ident -> [0]#chain_ident;
2443                                #second_ident -> [1]#chain_ident;
2444                            },
2445                            None,
2446                            Some(&next_stmt_id.to_string()),
2447                        );
2448                    }
2449                    BuildersOrCallback::Callback(_, node_callback) => {
2450                        node_callback(self, next_stmt_id);
2451                    }
2452                }
2453
2454                *next_stmt_id += 1;
2455
2456                chain_ident
2457            }
2458
2459            HydroNode::CrossSingleton { left, right, .. } => {
2460                let left_ident =
2461                    left.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2462                let right_ident =
2463                    right.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2464
2465                let cross_ident =
2466                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2467
2468                match builders_or_callback {
2469                    BuildersOrCallback::Builders(graph_builders) => {
2470                        let builder = graph_builders.get_dfir_mut(&out_location);
2471                        builder.add_dfir(
2472                            parse_quote! {
2473                                #cross_ident = cross_singleton();
2474                                #left_ident -> [input]#cross_ident;
2475                                #right_ident -> [single]#cross_ident;
2476                            },
2477                            None,
2478                            Some(&next_stmt_id.to_string()),
2479                        );
2480                    }
2481                    BuildersOrCallback::Callback(_, node_callback) => {
2482                        node_callback(self, next_stmt_id);
2483                    }
2484                }
2485
2486                *next_stmt_id += 1;
2487
2488                cross_ident
2489            }
2490
2491            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2492                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2493                    parse_quote!(cross_join_multiset)
2494                } else {
2495                    parse_quote!(join_multiset)
2496                };
2497
2498                let (HydroNode::CrossProduct { left, right, .. }
2499                | HydroNode::Join { left, right, .. }) = self
2500                else {
2501                    unreachable!()
2502                };
2503
2504                let is_top_level = left.metadata().location_kind.is_top_level()
2505                    && right.metadata().location_kind.is_top_level();
2506                let (left_inner, left_lifetime) = if left.metadata().location_kind.is_top_level() {
2507                    (left, quote!('static))
2508                } else {
2509                    (left, quote!('tick))
2510                };
2511
2512                let (right_inner, right_lifetime) = if right.metadata().location_kind.is_top_level()
2513                {
2514                    (right, quote!('static))
2515                } else {
2516                    (right, quote!('tick))
2517                };
2518
2519                let left_ident =
2520                    left_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2521                let right_ident =
2522                    right_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2523
2524                let stream_ident =
2525                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2526
2527                match builders_or_callback {
2528                    BuildersOrCallback::Builders(graph_builders) => {
2529                        let builder = graph_builders.get_dfir_mut(&out_location);
2530                        builder.add_dfir(
2531                            if is_top_level {
2532                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2533                                // a multiset_delta() to negate the replay behavior
2534                                parse_quote! {
2535                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2536                                    #left_ident -> [0]#stream_ident;
2537                                    #right_ident -> [1]#stream_ident;
2538                                }
2539                            } else {
2540                                parse_quote! {
2541                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2542                                    #left_ident -> [0]#stream_ident;
2543                                    #right_ident -> [1]#stream_ident;
2544                                }
2545                            }
2546                            ,
2547                            None,
2548                            Some(&next_stmt_id.to_string()),
2549                        );
2550                    }
2551                    BuildersOrCallback::Callback(_, node_callback) => {
2552                        node_callback(self, next_stmt_id);
2553                    }
2554                }
2555
2556                *next_stmt_id += 1;
2557
2558                stream_ident
2559            }
2560
2561            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2562                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2563                    parse_quote!(difference)
2564                } else {
2565                    parse_quote!(anti_join)
2566                };
2567
2568                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2569                    self
2570                else {
2571                    unreachable!()
2572                };
2573
2574                let (neg, neg_lifetime) = if neg.metadata().location_kind.is_top_level() {
2575                    (neg, quote!('static))
2576                } else {
2577                    (neg, quote!('tick))
2578                };
2579
2580                let pos_ident = pos.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2581                let neg_ident = neg.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2582
2583                let stream_ident =
2584                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2585
2586                match builders_or_callback {
2587                    BuildersOrCallback::Builders(graph_builders) => {
2588                        let builder = graph_builders.get_dfir_mut(&out_location);
2589                        builder.add_dfir(
2590                            parse_quote! {
2591                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2592                                #pos_ident -> [pos]#stream_ident;
2593                                #neg_ident -> [neg]#stream_ident;
2594                            },
2595                            None,
2596                            Some(&next_stmt_id.to_string()),
2597                        );
2598                    }
2599                    BuildersOrCallback::Callback(_, node_callback) => {
2600                        node_callback(self, next_stmt_id);
2601                    }
2602                }
2603
2604                *next_stmt_id += 1;
2605
2606                stream_ident
2607            }
2608
2609            HydroNode::ResolveFutures { input, .. } => {
2610                let input_ident =
2611                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2612
2613                let futures_ident =
2614                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2615
2616                match builders_or_callback {
2617                    BuildersOrCallback::Builders(graph_builders) => {
2618                        let builder = graph_builders.get_dfir_mut(&out_location);
2619                        builder.add_dfir(
2620                            parse_quote! {
2621                                #futures_ident = #input_ident -> resolve_futures();
2622                            },
2623                            None,
2624                            Some(&next_stmt_id.to_string()),
2625                        );
2626                    }
2627                    BuildersOrCallback::Callback(_, node_callback) => {
2628                        node_callback(self, next_stmt_id);
2629                    }
2630                }
2631
2632                *next_stmt_id += 1;
2633
2634                futures_ident
2635            }
2636
2637            HydroNode::ResolveFuturesOrdered { input, .. } => {
2638                let input_ident =
2639                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2640
2641                let futures_ident =
2642                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2643
2644                match builders_or_callback {
2645                    BuildersOrCallback::Builders(graph_builders) => {
2646                        let builder = graph_builders.get_dfir_mut(&out_location);
2647                        builder.add_dfir(
2648                            parse_quote! {
2649                                #futures_ident = #input_ident -> resolve_futures_ordered();
2650                            },
2651                            None,
2652                            Some(&next_stmt_id.to_string()),
2653                        );
2654                    }
2655                    BuildersOrCallback::Callback(_, node_callback) => {
2656                        node_callback(self, next_stmt_id);
2657                    }
2658                }
2659
2660                *next_stmt_id += 1;
2661
2662                futures_ident
2663            }
2664
2665            HydroNode::Map { f, input, .. } => {
2666                let input_ident =
2667                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2668
2669                let map_ident =
2670                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2671
2672                match builders_or_callback {
2673                    BuildersOrCallback::Builders(graph_builders) => {
2674                        let builder = graph_builders.get_dfir_mut(&out_location);
2675                        builder.add_dfir(
2676                            parse_quote! {
2677                                #map_ident = #input_ident -> map(#f);
2678                            },
2679                            None,
2680                            Some(&next_stmt_id.to_string()),
2681                        );
2682                    }
2683                    BuildersOrCallback::Callback(_, node_callback) => {
2684                        node_callback(self, next_stmt_id);
2685                    }
2686                }
2687
2688                *next_stmt_id += 1;
2689
2690                map_ident
2691            }
2692
2693            HydroNode::FlatMap { f, input, .. } => {
2694                let input_ident =
2695                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2696
2697                let flat_map_ident =
2698                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2699
2700                match builders_or_callback {
2701                    BuildersOrCallback::Builders(graph_builders) => {
2702                        let builder = graph_builders.get_dfir_mut(&out_location);
2703                        builder.add_dfir(
2704                            parse_quote! {
2705                                #flat_map_ident = #input_ident -> flat_map(#f);
2706                            },
2707                            None,
2708                            Some(&next_stmt_id.to_string()),
2709                        );
2710                    }
2711                    BuildersOrCallback::Callback(_, node_callback) => {
2712                        node_callback(self, next_stmt_id);
2713                    }
2714                }
2715
2716                *next_stmt_id += 1;
2717
2718                flat_map_ident
2719            }
2720
2721            HydroNode::Filter { f, input, .. } => {
2722                let input_ident =
2723                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2724
2725                let filter_ident =
2726                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2727
2728                match builders_or_callback {
2729                    BuildersOrCallback::Builders(graph_builders) => {
2730                        let builder = graph_builders.get_dfir_mut(&out_location);
2731                        builder.add_dfir(
2732                            parse_quote! {
2733                                #filter_ident = #input_ident -> filter(#f);
2734                            },
2735                            None,
2736                            Some(&next_stmt_id.to_string()),
2737                        );
2738                    }
2739                    BuildersOrCallback::Callback(_, node_callback) => {
2740                        node_callback(self, next_stmt_id);
2741                    }
2742                }
2743
2744                *next_stmt_id += 1;
2745
2746                filter_ident
2747            }
2748
2749            HydroNode::FilterMap { f, input, .. } => {
2750                let input_ident =
2751                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2752
2753                let filter_map_ident =
2754                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2755
2756                match builders_or_callback {
2757                    BuildersOrCallback::Builders(graph_builders) => {
2758                        let builder = graph_builders.get_dfir_mut(&out_location);
2759                        builder.add_dfir(
2760                            parse_quote! {
2761                                #filter_map_ident = #input_ident -> filter_map(#f);
2762                            },
2763                            None,
2764                            Some(&next_stmt_id.to_string()),
2765                        );
2766                    }
2767                    BuildersOrCallback::Callback(_, node_callback) => {
2768                        node_callback(self, next_stmt_id);
2769                    }
2770                }
2771
2772                *next_stmt_id += 1;
2773
2774                filter_map_ident
2775            }
2776
2777            HydroNode::Sort { input, .. } => {
2778                let input_ident =
2779                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2780
2781                let sort_ident =
2782                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2783
2784                match builders_or_callback {
2785                    BuildersOrCallback::Builders(graph_builders) => {
2786                        let builder = graph_builders.get_dfir_mut(&out_location);
2787                        builder.add_dfir(
2788                            parse_quote! {
2789                                #sort_ident = #input_ident -> sort();
2790                            },
2791                            None,
2792                            Some(&next_stmt_id.to_string()),
2793                        );
2794                    }
2795                    BuildersOrCallback::Callback(_, node_callback) => {
2796                        node_callback(self, next_stmt_id);
2797                    }
2798                }
2799
2800                *next_stmt_id += 1;
2801
2802                sort_ident
2803            }
2804
2805            HydroNode::DeferTick { input, .. } => {
2806                let input_ident =
2807                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2808
2809                let defer_tick_ident =
2810                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2811
2812                match builders_or_callback {
2813                    BuildersOrCallback::Builders(graph_builders) => {
2814                        let builder = graph_builders.get_dfir_mut(&out_location);
2815                        builder.add_dfir(
2816                            parse_quote! {
2817                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2818                            },
2819                            None,
2820                            Some(&next_stmt_id.to_string()),
2821                        );
2822                    }
2823                    BuildersOrCallback::Callback(_, node_callback) => {
2824                        node_callback(self, next_stmt_id);
2825                    }
2826                }
2827
2828                *next_stmt_id += 1;
2829
2830                defer_tick_ident
2831            }
2832
2833            HydroNode::Enumerate { input, .. } => {
2834                let input_ident =
2835                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2836
2837                let enumerate_ident =
2838                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2839
2840                match builders_or_callback {
2841                    BuildersOrCallback::Builders(graph_builders) => {
2842                        let builder = graph_builders.get_dfir_mut(&out_location);
2843                        let lifetime = if input.metadata().location_kind.is_top_level() {
2844                            quote!('static)
2845                        } else {
2846                            quote!('tick)
2847                        };
2848                        builder.add_dfir(
2849                            parse_quote! {
2850                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2851                            },
2852                            None,
2853                            Some(&next_stmt_id.to_string()),
2854                        );
2855                    }
2856                    BuildersOrCallback::Callback(_, node_callback) => {
2857                        node_callback(self, next_stmt_id);
2858                    }
2859                }
2860
2861                *next_stmt_id += 1;
2862
2863                enumerate_ident
2864            }
2865
2866            HydroNode::Inspect { f, input, .. } => {
2867                let input_ident =
2868                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2869
2870                let inspect_ident =
2871                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2872
2873                match builders_or_callback {
2874                    BuildersOrCallback::Builders(graph_builders) => {
2875                        let builder = graph_builders.get_dfir_mut(&out_location);
2876                        builder.add_dfir(
2877                            parse_quote! {
2878                                #inspect_ident = #input_ident -> inspect(#f);
2879                            },
2880                            None,
2881                            Some(&next_stmt_id.to_string()),
2882                        );
2883                    }
2884                    BuildersOrCallback::Callback(_, node_callback) => {
2885                        node_callback(self, next_stmt_id);
2886                    }
2887                }
2888
2889                *next_stmt_id += 1;
2890
2891                inspect_ident
2892            }
2893
2894            HydroNode::Unique { input, .. } => {
2895                let input_ident =
2896                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2897
2898                let unique_ident =
2899                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2900
2901                match builders_or_callback {
2902                    BuildersOrCallback::Builders(graph_builders) => {
2903                        let builder = graph_builders.get_dfir_mut(&out_location);
2904                        let lifetime = if input.metadata().location_kind.is_top_level() {
2905                            quote!('static)
2906                        } else {
2907                            quote!('tick)
2908                        };
2909
2910                        builder.add_dfir(
2911                            parse_quote! {
2912                                #unique_ident = #input_ident -> unique::<#lifetime>();
2913                            },
2914                            None,
2915                            Some(&next_stmt_id.to_string()),
2916                        );
2917                    }
2918                    BuildersOrCallback::Callback(_, node_callback) => {
2919                        node_callback(self, next_stmt_id);
2920                    }
2921                }
2922
2923                *next_stmt_id += 1;
2924
2925                unique_ident
2926            }
2927
2928            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2929                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2930                    parse_quote!(fold)
2931                } else if matches!(self, HydroNode::Scan { .. }) {
2932                    parse_quote!(scan)
2933                } else {
2934                    parse_quote!(fold_keyed)
2935                };
2936
2937                let (HydroNode::Fold { input, .. }
2938                | HydroNode::FoldKeyed { input, .. }
2939                | HydroNode::Scan { input, .. }) = self
2940                else {
2941                    unreachable!()
2942                };
2943
2944                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
2945                    (input, quote!('static))
2946                } else {
2947                    (input, quote!('tick))
2948                };
2949
2950                let input_ident =
2951                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2952
2953                let (HydroNode::Fold { init, acc, .. }
2954                | HydroNode::FoldKeyed { init, acc, .. }
2955                | HydroNode::Scan { init, acc, .. }) = &*self
2956                else {
2957                    unreachable!()
2958                };
2959
2960                let fold_ident =
2961                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2962
2963                match builders_or_callback {
2964                    BuildersOrCallback::Builders(graph_builders) => {
2965                        if matches!(self, HydroNode::Fold { .. })
2966                            && self.metadata().location_kind.is_top_level()
2967                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2968                            && graph_builders.singleton_intermediates()
2969                        {
2970                            let builder = graph_builders.get_dfir_mut(&out_location);
2971
2972                            let acc: syn::Expr = parse_quote!({
2973                                let mut __inner = #acc;
2974                                move |__state, __value| {
2975                                    __inner(__state, __value);
2976                                    Some(__state.clone())
2977                                }
2978                            });
2979
2980                            builder.add_dfir(
2981                                parse_quote! {
2982                                    source_iter([(#init)()]) -> [0]#fold_ident;
2983                                    #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2984                                    #fold_ident = chain();
2985                                },
2986                                None,
2987                                Some(&next_stmt_id.to_string()),
2988                            );
2989                        } else if matches!(self, HydroNode::FoldKeyed { .. })
2990                            && self.metadata().location_kind.is_top_level()
2991                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2992                            && graph_builders.singleton_intermediates()
2993                        {
2994                            let builder = graph_builders.get_dfir_mut(&out_location);
2995
2996                            let acc: syn::Expr = parse_quote!({
2997                                let mut __init = #init;
2998                                let mut __inner = #acc;
2999                                move |__state, __kv: (_, _)| {
3000                                    // TODO(shadaj): we can avoid the clone when the entry exists
3001                                    let __state = __state
3002                                        .entry(::std::clone::Clone::clone(&__kv.0))
3003                                        .or_insert_with(|| (__init)());
3004                                    __inner(__state, __kv.1);
3005                                    Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3006                                }
3007                            });
3008
3009                            builder.add_dfir(
3010                                parse_quote! {
3011                                    source_iter([(#init)()]) -> [0]#fold_ident;
3012                                    #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3013                                },
3014                                None,
3015                                Some(&next_stmt_id.to_string()),
3016                            );
3017                        } else {
3018                            let builder = graph_builders.get_dfir_mut(&out_location);
3019                            builder.add_dfir(
3020                                parse_quote! {
3021                                    #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3022                                },
3023                                None,
3024                                Some(&next_stmt_id.to_string()),
3025                            );
3026                        }
3027                    }
3028                    BuildersOrCallback::Callback(_, node_callback) => {
3029                        node_callback(self, next_stmt_id);
3030                    }
3031                }
3032
3033                *next_stmt_id += 1;
3034
3035                fold_ident
3036            }
3037
3038            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3039                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3040                    parse_quote!(reduce)
3041                } else {
3042                    parse_quote!(reduce_keyed)
3043                };
3044
3045                let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3046                else {
3047                    unreachable!()
3048                };
3049
3050                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3051                    (input, quote!('static))
3052                } else {
3053                    (input, quote!('tick))
3054                };
3055
3056                let input_ident =
3057                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3058
3059                let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3060                else {
3061                    unreachable!()
3062                };
3063
3064                let reduce_ident =
3065                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3066
3067                match builders_or_callback {
3068                    BuildersOrCallback::Builders(graph_builders) => {
3069                        if matches!(self, HydroNode::Reduce { .. })
3070                            && self.metadata().location_kind.is_top_level()
3071                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3072                            && graph_builders.singleton_intermediates()
3073                        {
3074                            todo!(
3075                                "Reduce with optional intermediates is not yet supported in simulator"
3076                            );
3077                        } else if matches!(self, HydroNode::ReduceKeyed { .. })
3078                            && self.metadata().location_kind.is_top_level()
3079                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3080                            && graph_builders.singleton_intermediates()
3081                        {
3082                            todo!();
3083                        } else {
3084                            let builder = graph_builders.get_dfir_mut(&out_location);
3085                            builder.add_dfir(
3086                                parse_quote! {
3087                                    #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3088                                },
3089                                None,
3090                                Some(&next_stmt_id.to_string()),
3091                            );
3092                        }
3093                    }
3094                    BuildersOrCallback::Callback(_, node_callback) => {
3095                        node_callback(self, next_stmt_id);
3096                    }
3097                }
3098
3099                *next_stmt_id += 1;
3100
3101                reduce_ident
3102            }
3103
3104            HydroNode::ReduceKeyedWatermark {
3105                f,
3106                input,
3107                watermark,
3108                ..
3109            } => {
3110                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3111                    (input, quote!('static))
3112                } else {
3113                    (input, quote!('tick))
3114                };
3115
3116                let input_ident =
3117                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3118
3119                let watermark_ident =
3120                    watermark.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3121
3122                let chain_ident = syn::Ident::new(
3123                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3124                    Span::call_site(),
3125                );
3126
3127                let fold_ident =
3128                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3129
3130                match builders_or_callback {
3131                    BuildersOrCallback::Builders(graph_builders) => {
3132                        let builder = graph_builders.get_dfir_mut(&out_location);
3133                        // 1. Don't allow any values to be added to the map if the key <=the watermark
3134                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
3135                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
3136                        // 3. Convert the BTreeMap back into a stream of (k, v)
3137                        builder.add_dfir(
3138                            parse_quote! {
3139                                #chain_ident = chain();
3140                                #input_ident
3141                                    -> map(|x| (Some(x), None))
3142                                    -> [0]#chain_ident;
3143                                #watermark_ident
3144                                    -> map(|watermark| (None, Some(watermark)))
3145                                    -> [1]#chain_ident;
3146
3147                                #fold_ident = #chain_ident
3148                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3149                                        let __reduce_keyed_fn = #f;
3150                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3151                                            if let Some((k, v)) = opt_payload {
3152                                                if let Some(curr_watermark) = *opt_curr_watermark {
3153                                                    if k <= curr_watermark {
3154                                                        return;
3155                                                    }
3156                                                }
3157                                                match map.entry(k) {
3158                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
3159                                                        e.insert(v);
3160                                                    }
3161                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
3162                                                        __reduce_keyed_fn(e.get_mut(), v);
3163                                                    }
3164                                                }
3165                                            } else {
3166                                                let watermark = opt_watermark.unwrap();
3167                                                if let Some(curr_watermark) = *opt_curr_watermark {
3168                                                    if watermark <= curr_watermark {
3169                                                        return;
3170                                                    }
3171                                                }
3172                                                *opt_curr_watermark = opt_watermark;
3173                                                map.retain(|k, _| *k > watermark);
3174                                            }
3175                                        }
3176                                    })
3177                                    -> flat_map(|(map, _curr_watermark)| map);
3178                            },
3179                            None,
3180                            Some(&next_stmt_id.to_string()),
3181                        );
3182                    }
3183                    BuildersOrCallback::Callback(_, node_callback) => {
3184                        node_callback(self, next_stmt_id);
3185                    }
3186                }
3187
3188                *next_stmt_id += 1;
3189
3190                fold_ident
3191            }
3192
3193            HydroNode::Network {
3194                serialize_fn: serialize_pipeline,
3195                instantiate_fn,
3196                deserialize_fn: deserialize_pipeline,
3197                input,
3198                ..
3199            } => {
3200                let input_ident =
3201                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3202
3203                let receiver_stream_ident =
3204                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3205
3206                match builders_or_callback {
3207                    BuildersOrCallback::Builders(graph_builders) => {
3208                        let (sink_expr, source_expr) = match instantiate_fn {
3209                            DebugInstantiate::Building => (
3210                                syn::parse_quote!(DUMMY_SINK),
3211                                syn::parse_quote!(DUMMY_SOURCE),
3212                            ),
3213
3214                            DebugInstantiate::Finalized(finalized) => {
3215                                (finalized.sink.clone(), finalized.source.clone())
3216                            }
3217                        };
3218
3219                        graph_builders.create_network(
3220                            &input.metadata().location_kind,
3221                            &out_location,
3222                            input_ident,
3223                            &receiver_stream_ident,
3224                            serialize_pipeline,
3225                            sink_expr,
3226                            source_expr,
3227                            deserialize_pipeline,
3228                            *next_stmt_id,
3229                        );
3230                    }
3231                    BuildersOrCallback::Callback(_, node_callback) => {
3232                        node_callback(self, next_stmt_id);
3233                    }
3234                }
3235
3236                *next_stmt_id += 1;
3237
3238                receiver_stream_ident
3239            }
3240
3241            HydroNode::ExternalInput {
3242                instantiate_fn,
3243                deserialize_fn: deserialize_pipeline,
3244                ..
3245            } => {
3246                let receiver_stream_ident =
3247                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3248
3249                match builders_or_callback {
3250                    BuildersOrCallback::Builders(graph_builders) => {
3251                        let (_, source_expr) = match instantiate_fn {
3252                            DebugInstantiate::Building => (
3253                                syn::parse_quote!(DUMMY_SINK),
3254                                syn::parse_quote!(DUMMY_SOURCE),
3255                            ),
3256
3257                            DebugInstantiate::Finalized(finalized) => {
3258                                (finalized.sink.clone(), finalized.source.clone())
3259                            }
3260                        };
3261
3262                        graph_builders.create_external_source(
3263                            &out_location,
3264                            source_expr,
3265                            &receiver_stream_ident,
3266                            deserialize_pipeline,
3267                            *next_stmt_id,
3268                        );
3269                    }
3270                    BuildersOrCallback::Callback(_, node_callback) => {
3271                        node_callback(self, next_stmt_id);
3272                    }
3273                }
3274
3275                *next_stmt_id += 1;
3276
3277                receiver_stream_ident
3278            }
3279
3280            HydroNode::Counter {
3281                tag,
3282                duration,
3283                prefix,
3284                input,
3285                ..
3286            } => {
3287                let input_ident =
3288                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3289
3290                let counter_ident =
3291                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3292
3293                match builders_or_callback {
3294                    BuildersOrCallback::Builders(graph_builders) => {
3295                        let builder = graph_builders.get_dfir_mut(&out_location);
3296                        builder.add_dfir(
3297                            parse_quote! {
3298                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3299                            },
3300                            None,
3301                            Some(&next_stmt_id.to_string()),
3302                        );
3303                    }
3304                    BuildersOrCallback::Callback(_, node_callback) => {
3305                        node_callback(self, next_stmt_id);
3306                    }
3307                }
3308
3309                *next_stmt_id += 1;
3310
3311                counter_ident
3312            }
3313        }
3314    }
3315
3316    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3317        match self {
3318            HydroNode::Placeholder => {
3319                panic!()
3320            }
3321            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3322            HydroNode::Source { source, .. } => match source {
3323                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3324                HydroSource::ExternalNetwork()
3325                | HydroSource::Spin()
3326                | HydroSource::ClusterMembers(_) => {} // TODO: what goes here?
3327            },
3328            HydroNode::SingletonSource { value, .. } => {
3329                transform(value);
3330            }
3331            HydroNode::CycleSource { .. }
3332            | HydroNode::Tee { .. }
3333            | HydroNode::YieldConcat { .. }
3334            | HydroNode::BeginAtomic { .. }
3335            | HydroNode::EndAtomic { .. }
3336            | HydroNode::Batch { .. }
3337            | HydroNode::Chain { .. }
3338            | HydroNode::ChainFirst { .. }
3339            | HydroNode::CrossProduct { .. }
3340            | HydroNode::CrossSingleton { .. }
3341            | HydroNode::ResolveFutures { .. }
3342            | HydroNode::ResolveFuturesOrdered { .. }
3343            | HydroNode::Join { .. }
3344            | HydroNode::Difference { .. }
3345            | HydroNode::AntiJoin { .. }
3346            | HydroNode::DeferTick { .. }
3347            | HydroNode::Enumerate { .. }
3348            | HydroNode::Unique { .. }
3349            | HydroNode::Sort { .. } => {}
3350            HydroNode::Map { f, .. }
3351            | HydroNode::FlatMap { f, .. }
3352            | HydroNode::Filter { f, .. }
3353            | HydroNode::FilterMap { f, .. }
3354            | HydroNode::Inspect { f, .. }
3355            | HydroNode::Reduce { f, .. }
3356            | HydroNode::ReduceKeyed { f, .. }
3357            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3358                transform(f);
3359            }
3360            HydroNode::Fold { init, acc, .. }
3361            | HydroNode::Scan { init, acc, .. }
3362            | HydroNode::FoldKeyed { init, acc, .. } => {
3363                transform(init);
3364                transform(acc);
3365            }
3366            HydroNode::Network {
3367                serialize_fn,
3368                deserialize_fn,
3369                ..
3370            } => {
3371                if let Some(serialize_fn) = serialize_fn {
3372                    transform(serialize_fn);
3373                }
3374                if let Some(deserialize_fn) = deserialize_fn {
3375                    transform(deserialize_fn);
3376                }
3377            }
3378            HydroNode::ExternalInput { deserialize_fn, .. } => {
3379                if let Some(deserialize_fn) = deserialize_fn {
3380                    transform(deserialize_fn);
3381                }
3382            }
3383            HydroNode::Counter { duration, .. } => {
3384                transform(duration);
3385            }
3386        }
3387    }
3388
3389    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3390        &self.metadata().op
3391    }
3392
3393    pub fn metadata(&self) -> &HydroIrMetadata {
3394        match self {
3395            HydroNode::Placeholder => {
3396                panic!()
3397            }
3398            HydroNode::Cast { metadata, .. } => metadata,
3399            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3400            HydroNode::Source { metadata, .. } => metadata,
3401            HydroNode::SingletonSource { metadata, .. } => metadata,
3402            HydroNode::CycleSource { metadata, .. } => metadata,
3403            HydroNode::Tee { metadata, .. } => metadata,
3404            HydroNode::YieldConcat { metadata, .. } => metadata,
3405            HydroNode::BeginAtomic { metadata, .. } => metadata,
3406            HydroNode::EndAtomic { metadata, .. } => metadata,
3407            HydroNode::Batch { metadata, .. } => metadata,
3408            HydroNode::Chain { metadata, .. } => metadata,
3409            HydroNode::ChainFirst { metadata, .. } => metadata,
3410            HydroNode::CrossProduct { metadata, .. } => metadata,
3411            HydroNode::CrossSingleton { metadata, .. } => metadata,
3412            HydroNode::Join { metadata, .. } => metadata,
3413            HydroNode::Difference { metadata, .. } => metadata,
3414            HydroNode::AntiJoin { metadata, .. } => metadata,
3415            HydroNode::ResolveFutures { metadata, .. } => metadata,
3416            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3417            HydroNode::Map { metadata, .. } => metadata,
3418            HydroNode::FlatMap { metadata, .. } => metadata,
3419            HydroNode::Filter { metadata, .. } => metadata,
3420            HydroNode::FilterMap { metadata, .. } => metadata,
3421            HydroNode::DeferTick { metadata, .. } => metadata,
3422            HydroNode::Enumerate { metadata, .. } => metadata,
3423            HydroNode::Inspect { metadata, .. } => metadata,
3424            HydroNode::Unique { metadata, .. } => metadata,
3425            HydroNode::Sort { metadata, .. } => metadata,
3426            HydroNode::Scan { metadata, .. } => metadata,
3427            HydroNode::Fold { metadata, .. } => metadata,
3428            HydroNode::FoldKeyed { metadata, .. } => metadata,
3429            HydroNode::Reduce { metadata, .. } => metadata,
3430            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3431            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3432            HydroNode::ExternalInput { metadata, .. } => metadata,
3433            HydroNode::Network { metadata, .. } => metadata,
3434            HydroNode::Counter { metadata, .. } => metadata,
3435        }
3436    }
3437
3438    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3439        &mut self.metadata_mut().op
3440    }
3441
3442    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3443        match self {
3444            HydroNode::Placeholder => {
3445                panic!()
3446            }
3447            HydroNode::Cast { metadata, .. } => metadata,
3448            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3449            HydroNode::Source { metadata, .. } => metadata,
3450            HydroNode::SingletonSource { metadata, .. } => metadata,
3451            HydroNode::CycleSource { metadata, .. } => metadata,
3452            HydroNode::Tee { metadata, .. } => metadata,
3453            HydroNode::YieldConcat { metadata, .. } => metadata,
3454            HydroNode::BeginAtomic { metadata, .. } => metadata,
3455            HydroNode::EndAtomic { metadata, .. } => metadata,
3456            HydroNode::Batch { metadata, .. } => metadata,
3457            HydroNode::Chain { metadata, .. } => metadata,
3458            HydroNode::ChainFirst { metadata, .. } => metadata,
3459            HydroNode::CrossProduct { metadata, .. } => metadata,
3460            HydroNode::CrossSingleton { metadata, .. } => metadata,
3461            HydroNode::Join { metadata, .. } => metadata,
3462            HydroNode::Difference { metadata, .. } => metadata,
3463            HydroNode::AntiJoin { metadata, .. } => metadata,
3464            HydroNode::ResolveFutures { metadata, .. } => metadata,
3465            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3466            HydroNode::Map { metadata, .. } => metadata,
3467            HydroNode::FlatMap { metadata, .. } => metadata,
3468            HydroNode::Filter { metadata, .. } => metadata,
3469            HydroNode::FilterMap { metadata, .. } => metadata,
3470            HydroNode::DeferTick { metadata, .. } => metadata,
3471            HydroNode::Enumerate { metadata, .. } => metadata,
3472            HydroNode::Inspect { metadata, .. } => metadata,
3473            HydroNode::Unique { metadata, .. } => metadata,
3474            HydroNode::Sort { metadata, .. } => metadata,
3475            HydroNode::Scan { metadata, .. } => metadata,
3476            HydroNode::Fold { metadata, .. } => metadata,
3477            HydroNode::FoldKeyed { metadata, .. } => metadata,
3478            HydroNode::Reduce { metadata, .. } => metadata,
3479            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3480            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3481            HydroNode::ExternalInput { metadata, .. } => metadata,
3482            HydroNode::Network { metadata, .. } => metadata,
3483            HydroNode::Counter { metadata, .. } => metadata,
3484        }
3485    }
3486
3487    pub fn input(&self) -> Vec<&HydroNode> {
3488        match self {
3489            HydroNode::Placeholder => {
3490                panic!()
3491            }
3492            HydroNode::Source { .. }
3493            | HydroNode::SingletonSource { .. }
3494            | HydroNode::ExternalInput { .. }
3495            | HydroNode::CycleSource { .. }
3496            | HydroNode::Tee { .. } => {
3497                // Tee should find its input in separate special ways
3498                vec![]
3499            }
3500            HydroNode::Cast { inner, .. }
3501            | HydroNode::ObserveNonDet { inner, .. }
3502            | HydroNode::YieldConcat { inner, .. }
3503            | HydroNode::BeginAtomic { inner, .. }
3504            | HydroNode::EndAtomic { inner, .. }
3505            | HydroNode::Batch { inner, .. } => {
3506                vec![inner]
3507            }
3508            HydroNode::Chain { first, second, .. } => {
3509                vec![first, second]
3510            }
3511            HydroNode::ChainFirst { first, second, .. } => {
3512                vec![first, second]
3513            }
3514            HydroNode::CrossProduct { left, right, .. }
3515            | HydroNode::CrossSingleton { left, right, .. }
3516            | HydroNode::Join { left, right, .. } => {
3517                vec![left, right]
3518            }
3519            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3520                vec![pos, neg]
3521            }
3522            HydroNode::Map { input, .. }
3523            | HydroNode::FlatMap { input, .. }
3524            | HydroNode::Filter { input, .. }
3525            | HydroNode::FilterMap { input, .. }
3526            | HydroNode::Sort { input, .. }
3527            | HydroNode::DeferTick { input, .. }
3528            | HydroNode::Enumerate { input, .. }
3529            | HydroNode::Inspect { input, .. }
3530            | HydroNode::Unique { input, .. }
3531            | HydroNode::Network { input, .. }
3532            | HydroNode::Counter { input, .. }
3533            | HydroNode::ResolveFutures { input, .. }
3534            | HydroNode::ResolveFuturesOrdered { input, .. }
3535            | HydroNode::Fold { input, .. }
3536            | HydroNode::FoldKeyed { input, .. }
3537            | HydroNode::Reduce { input, .. }
3538            | HydroNode::ReduceKeyed { input, .. }
3539            | HydroNode::Scan { input, .. } => {
3540                vec![input]
3541            }
3542            HydroNode::ReduceKeyedWatermark {
3543                input, watermark, ..
3544            } => {
3545                vec![input, watermark]
3546            }
3547        }
3548    }
3549
3550    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3551        self.input()
3552            .iter()
3553            .map(|input_node| input_node.metadata())
3554            .collect()
3555    }
3556
3557    pub fn print_root(&self) -> String {
3558        match self {
3559            HydroNode::Placeholder => {
3560                panic!()
3561            }
3562            HydroNode::Cast { .. } => "Cast()".to_string(),
3563            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3564            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3565            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3566            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3567            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3568            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3569            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3570            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3571            HydroNode::Batch { .. } => "Batch()".to_string(),
3572            HydroNode::Chain { first, second, .. } => {
3573                format!("Chain({}, {})", first.print_root(), second.print_root())
3574            }
3575            HydroNode::ChainFirst { first, second, .. } => {
3576                format!(
3577                    "ChainFirst({}, {})",
3578                    first.print_root(),
3579                    second.print_root()
3580                )
3581            }
3582            HydroNode::CrossProduct { left, right, .. } => {
3583                format!(
3584                    "CrossProduct({}, {})",
3585                    left.print_root(),
3586                    right.print_root()
3587                )
3588            }
3589            HydroNode::CrossSingleton { left, right, .. } => {
3590                format!(
3591                    "CrossSingleton({}, {})",
3592                    left.print_root(),
3593                    right.print_root()
3594                )
3595            }
3596            HydroNode::Join { left, right, .. } => {
3597                format!("Join({}, {})", left.print_root(), right.print_root())
3598            }
3599            HydroNode::Difference { pos, neg, .. } => {
3600                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3601            }
3602            HydroNode::AntiJoin { pos, neg, .. } => {
3603                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3604            }
3605            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3606            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3607            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3608            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3609            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3610            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3611            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3612            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3613            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3614            HydroNode::Unique { .. } => "Unique()".to_string(),
3615            HydroNode::Sort { .. } => "Sort()".to_string(),
3616            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3617            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3618            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3619            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3620            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3621            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3622            HydroNode::Network { .. } => "Network()".to_string(),
3623            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3624            HydroNode::Counter { tag, duration, .. } => {
3625                format!("Counter({:?}, {:?})", tag, duration)
3626            }
3627        }
3628    }
3629}
3630
3631#[cfg(feature = "build")]
3632fn instantiate_network<'a, D>(
3633    from_location: &LocationId,
3634    to_location: &LocationId,
3635    processes: &HashMap<usize, D::Process>,
3636    clusters: &HashMap<usize, D::Cluster>,
3637) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3638where
3639    D: Deploy<'a>,
3640{
3641    let ((sink, source), connect_fn) = match (from_location, to_location) {
3642        (LocationId::Process(from), LocationId::Process(to)) => {
3643            let from_node = processes
3644                .get(from)
3645                .unwrap_or_else(|| {
3646                    panic!("A process used in the graph was not instantiated: {}", from)
3647                })
3648                .clone();
3649            let to_node = processes
3650                .get(to)
3651                .unwrap_or_else(|| {
3652                    panic!("A process used in the graph was not instantiated: {}", to)
3653                })
3654                .clone();
3655
3656            let sink_port = D::allocate_process_port(&from_node);
3657            let source_port = D::allocate_process_port(&to_node);
3658
3659            (
3660                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3661                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3662            )
3663        }
3664        (LocationId::Process(from), LocationId::Cluster(to)) => {
3665            let from_node = processes
3666                .get(from)
3667                .unwrap_or_else(|| {
3668                    panic!("A process used in the graph was not instantiated: {}", from)
3669                })
3670                .clone();
3671            let to_node = clusters
3672                .get(to)
3673                .unwrap_or_else(|| {
3674                    panic!("A cluster used in the graph was not instantiated: {}", to)
3675                })
3676                .clone();
3677
3678            let sink_port = D::allocate_process_port(&from_node);
3679            let source_port = D::allocate_cluster_port(&to_node);
3680
3681            (
3682                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3683                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3684            )
3685        }
3686        (LocationId::Cluster(from), LocationId::Process(to)) => {
3687            let from_node = clusters
3688                .get(from)
3689                .unwrap_or_else(|| {
3690                    panic!("A cluster used in the graph was not instantiated: {}", from)
3691                })
3692                .clone();
3693            let to_node = processes
3694                .get(to)
3695                .unwrap_or_else(|| {
3696                    panic!("A process used in the graph was not instantiated: {}", to)
3697                })
3698                .clone();
3699
3700            let sink_port = D::allocate_cluster_port(&from_node);
3701            let source_port = D::allocate_process_port(&to_node);
3702
3703            (
3704                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3705                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3706            )
3707        }
3708        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3709            let from_node = clusters
3710                .get(from)
3711                .unwrap_or_else(|| {
3712                    panic!("A cluster used in the graph was not instantiated: {}", from)
3713                })
3714                .clone();
3715            let to_node = clusters
3716                .get(to)
3717                .unwrap_or_else(|| {
3718                    panic!("A cluster used in the graph was not instantiated: {}", to)
3719                })
3720                .clone();
3721
3722            let sink_port = D::allocate_cluster_port(&from_node);
3723            let source_port = D::allocate_cluster_port(&to_node);
3724
3725            (
3726                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3727                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3728            )
3729        }
3730        (LocationId::Tick(_, _), _) => panic!(),
3731        (_, LocationId::Tick(_, _)) => panic!(),
3732        (LocationId::Atomic(_), _) => panic!(),
3733        (_, LocationId::Atomic(_)) => panic!(),
3734    };
3735    (sink, source, connect_fn)
3736}
3737
3738#[cfg(test)]
3739mod test {
3740    use std::mem::size_of;
3741
3742    use stageleft::{QuotedWithContext, q};
3743
3744    use super::*;
3745
3746    #[test]
3747    #[cfg_attr(
3748        not(feature = "build"),
3749        ignore = "expects inclusion of feature-gated fields"
3750    )]
3751    fn hydro_node_size() {
3752        assert_eq!(size_of::<HydroNode>(), 272);
3753    }
3754
3755    #[test]
3756    #[cfg_attr(
3757        not(feature = "build"),
3758        ignore = "expects inclusion of feature-gated fields"
3759    )]
3760    fn hydro_root_size() {
3761        assert_eq!(size_of::<HydroRoot>(), 168);
3762    }
3763
3764    #[test]
3765    fn test_simplify_q_macro_basic() {
3766        // Test basic non-q! expression
3767        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3768        let result = simplify_q_macro(simple_expr.clone());
3769        assert_eq!(result, simple_expr);
3770    }
3771
3772    #[test]
3773    fn test_simplify_q_macro_actual_stageleft_call() {
3774        // Test a simplified version of what a real stageleft call might look like
3775        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3776        let result = simplify_q_macro(stageleft_call);
3777        // This should be processed by our visitor and simplified to q!(...)
3778        // since we detect the stageleft::runtime_support::fn_* pattern
3779        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3780    }
3781
3782    #[test]
3783    fn test_closure_no_pipe_at_start() {
3784        // Test a closure that does not start with a pipe
3785        let stageleft_call = q!({
3786            let foo = 123;
3787            move |b: usize| b + foo
3788        })
3789        .splice_fn1_ctx(&());
3790        let result = simplify_q_macro(stageleft_call);
3791        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3792    }
3793}