hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307}
308
309#[cfg(feature = "build")]
310/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
311/// and simulations.
312///
313/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
314/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
315pub trait DfirBuilder {
316    /// Gets the DFIR builder for the given location, creating it if necessary.
317    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
318
319    fn batch(
320        &mut self,
321        in_ident: syn::Ident,
322        in_location: &LocationId,
323        _in_kind: &CollectionKind,
324        out_ident: &syn::Ident,
325        out_location: &LocationId,
326        op_meta: &HydroIrOpMetadata,
327    );
328    fn yield_from_tick(
329        &mut self,
330        in_ident: syn::Ident,
331        in_location: &LocationId,
332        in_kind: &CollectionKind,
333        out_ident: &syn::Ident,
334    );
335
336    fn observe_nondet(
337        &mut self,
338        location: &LocationId,
339        in_ident: syn::Ident,
340        in_kind: &CollectionKind,
341        out_ident: &syn::Ident,
342        out_kind: &CollectionKind,
343    );
344
345    #[expect(clippy::too_many_arguments, reason = "TODO")]
346    fn create_network(
347        &mut self,
348        from: &LocationId,
349        to: &LocationId,
350        input_ident: syn::Ident,
351        out_ident: &syn::Ident,
352        serialize: &Option<DebugExpr>,
353        sink: syn::Expr,
354        source: syn::Expr,
355        deserialize: &Option<DebugExpr>,
356        tag_id: usize,
357    );
358
359    fn create_external_source(
360        &mut self,
361        on: &LocationId,
362        source_expr: syn::Expr,
363        out_ident: &syn::Ident,
364        deserialize: &Option<DebugExpr>,
365        tag_id: usize,
366    );
367
368    fn create_external_output(
369        &mut self,
370        on: &LocationId,
371        sink_expr: syn::Expr,
372        input_ident: &syn::Ident,
373        serialize: &Option<DebugExpr>,
374        tag_id: usize,
375    );
376}
377
378#[cfg(feature = "build")]
379impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
380    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
381        self.entry(location.root().raw_id()).or_default()
382    }
383
384    fn batch(
385        &mut self,
386        in_ident: syn::Ident,
387        in_location: &LocationId,
388        _in_kind: &CollectionKind,
389        out_ident: &syn::Ident,
390        _out_location: &LocationId,
391        _op_meta: &HydroIrOpMetadata,
392    ) {
393        let builder = self.get_dfir_mut(in_location.root());
394        builder.add_dfir(
395            parse_quote! {
396                #out_ident = #in_ident;
397            },
398            None,
399            None,
400        );
401    }
402
403    fn yield_from_tick(
404        &mut self,
405        in_ident: syn::Ident,
406        in_location: &LocationId,
407        _in_kind: &CollectionKind,
408        out_ident: &syn::Ident,
409    ) {
410        let builder = self.get_dfir_mut(in_location.root());
411        builder.add_dfir(
412            parse_quote! {
413                #out_ident = #in_ident;
414            },
415            None,
416            None,
417        );
418    }
419
420    fn observe_nondet(
421        &mut self,
422        location: &LocationId,
423        in_ident: syn::Ident,
424        _in_kind: &CollectionKind,
425        out_ident: &syn::Ident,
426        _out_kind: &CollectionKind,
427    ) {
428        let builder = self.get_dfir_mut(location);
429        builder.add_dfir(
430            parse_quote! {
431                #out_ident = #in_ident;
432            },
433            None,
434            None,
435        );
436    }
437
438    fn create_network(
439        &mut self,
440        from: &LocationId,
441        to: &LocationId,
442        input_ident: syn::Ident,
443        out_ident: &syn::Ident,
444        serialize: &Option<DebugExpr>,
445        sink: syn::Expr,
446        source: syn::Expr,
447        deserialize: &Option<DebugExpr>,
448        tag_id: usize,
449    ) {
450        let sender_builder = self.get_dfir_mut(from);
451        if let Some(serialize_pipeline) = serialize {
452            sender_builder.add_dfir(
453                parse_quote! {
454                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
455                },
456                None,
457                // operator tag separates send and receive, which otherwise have the same next_stmt_id
458                Some(&format!("send{}", tag_id)),
459            );
460        } else {
461            sender_builder.add_dfir(
462                parse_quote! {
463                    #input_ident -> dest_sink(#sink);
464                },
465                None,
466                Some(&format!("send{}", tag_id)),
467            );
468        }
469
470        let receiver_builder = self.get_dfir_mut(to);
471        if let Some(deserialize_pipeline) = deserialize {
472            receiver_builder.add_dfir(
473                parse_quote! {
474                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
475                },
476                None,
477                Some(&format!("recv{}", tag_id)),
478            );
479        } else {
480            receiver_builder.add_dfir(
481                parse_quote! {
482                    #out_ident = source_stream(#source);
483                },
484                None,
485                Some(&format!("recv{}", tag_id)),
486            );
487        }
488    }
489
490    fn create_external_source(
491        &mut self,
492        on: &LocationId,
493        source_expr: syn::Expr,
494        out_ident: &syn::Ident,
495        deserialize: &Option<DebugExpr>,
496        tag_id: usize,
497    ) {
498        let receiver_builder = self.get_dfir_mut(on);
499        if let Some(deserialize_pipeline) = deserialize {
500            receiver_builder.add_dfir(
501                parse_quote! {
502                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
503                },
504                None,
505                Some(&format!("recv{}", tag_id)),
506            );
507        } else {
508            receiver_builder.add_dfir(
509                parse_quote! {
510                    #out_ident = source_stream(#source_expr);
511                },
512                None,
513                Some(&format!("recv{}", tag_id)),
514            );
515        }
516    }
517
518    fn create_external_output(
519        &mut self,
520        on: &LocationId,
521        sink_expr: syn::Expr,
522        input_ident: &syn::Ident,
523        serialize: &Option<DebugExpr>,
524        tag_id: usize,
525    ) {
526        let sender_builder = self.get_dfir_mut(on);
527        if let Some(serialize_fn) = serialize {
528            sender_builder.add_dfir(
529                parse_quote! {
530                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
531                },
532                None,
533                // operator tag separates send and receive, which otherwise have the same next_stmt_id
534                Some(&format!("send{}", tag_id)),
535            );
536        } else {
537            sender_builder.add_dfir(
538                parse_quote! {
539                    #input_ident -> dest_sink(#sink_expr);
540                },
541                None,
542                Some(&format!("send{}", tag_id)),
543            );
544        }
545    }
546}
547
548#[cfg(feature = "build")]
549pub enum BuildersOrCallback<'a, L, N>
550where
551    L: FnMut(&mut HydroRoot, &mut usize),
552    N: FnMut(&mut HydroNode, &mut usize),
553{
554    Builders(&'a mut dyn DfirBuilder),
555    Callback(L, N),
556}
557
558/// An root in a Hydro graph, which is an pipeline that doesn't emit
559/// any downstream values. Traversals over the dataflow graph and
560/// generating DFIR IR start from roots.
561#[derive(Debug, Hash)]
562pub enum HydroRoot {
563    ForEach {
564        f: DebugExpr,
565        input: Box<HydroNode>,
566        op_metadata: HydroIrOpMetadata,
567    },
568    SendExternal {
569        to_external_id: usize,
570        to_key: usize,
571        to_many: bool,
572        serialize_fn: Option<DebugExpr>,
573        instantiate_fn: DebugInstantiate,
574        input: Box<HydroNode>,
575        op_metadata: HydroIrOpMetadata,
576    },
577    DestSink {
578        sink: DebugExpr,
579        input: Box<HydroNode>,
580        op_metadata: HydroIrOpMetadata,
581    },
582    CycleSink {
583        ident: syn::Ident,
584        input: Box<HydroNode>,
585        op_metadata: HydroIrOpMetadata,
586    },
587}
588
589impl HydroRoot {
590    #[cfg(feature = "build")]
591    pub fn compile_network<'a, D>(
592        &mut self,
593        compile_env: &D::CompileEnv,
594        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
595        seen_tees: &mut SeenTees,
596        processes: &HashMap<usize, D::Process>,
597        clusters: &HashMap<usize, D::Cluster>,
598        externals: &HashMap<usize, D::External>,
599    ) where
600        D: Deploy<'a>,
601    {
602        self.transform_bottom_up(
603            &mut |l| {
604                if let HydroRoot::SendExternal {
605                    input,
606                    to_external_id,
607                    to_key,
608                    to_many,
609                    instantiate_fn,
610                    ..
611                } = l
612                {
613                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
614                        DebugInstantiate::Building => {
615                            let to_node = externals
616                                .get(to_external_id)
617                                .unwrap_or_else(|| {
618                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
619                                })
620                                .clone();
621
622                            match input.metadata().location_kind.root() {
623                                LocationId::Process(process_id) => {
624                                    if *to_many {
625                                        (
626                                            (
627                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
628                                                parse_quote!(DUMMY),
629                                            ),
630                                            Box::new(|| {}) as Box<dyn FnOnce()>,
631                                        )
632                                    } else {
633                                        let from_node = processes
634                                            .get(process_id)
635                                            .unwrap_or_else(|| {
636                                                panic!("A process used in the graph was not instantiated: {}", process_id)
637                                            })
638                                            .clone();
639
640                                        let sink_port = D::allocate_process_port(&from_node);
641                                        let source_port = D::allocate_external_port(&to_node);
642
643                                        to_node.register(*to_key, source_port.clone());
644
645                                        (
646                                            (
647                                                D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
648                                                parse_quote!(DUMMY),
649                                            ),
650                                            D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
651                                        )
652                                    }
653                                }
654                                LocationId::Cluster(_) => todo!(),
655                                _ => panic!()
656                            }
657                        },
658
659                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
660                    };
661
662                    *instantiate_fn = DebugInstantiateFinalized {
663                        sink: sink_expr,
664                        source: source_expr,
665                        connect_fn: Some(connect_fn),
666                    }
667                    .into();
668                }
669            },
670            &mut |n| {
671                if let HydroNode::Network {
672                    input,
673                    instantiate_fn,
674                    metadata,
675                    ..
676                } = n
677                {
678                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
679                        DebugInstantiate::Building => instantiate_network::<D>(
680                            input.metadata().location_kind.root(),
681                            metadata.location_kind.root(),
682                            processes,
683                            clusters,
684                            compile_env,
685                        ),
686
687                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
688                    };
689
690                    *instantiate_fn = DebugInstantiateFinalized {
691                        sink: sink_expr,
692                        source: source_expr,
693                        connect_fn: Some(connect_fn),
694                    }
695                    .into();
696                } else if let HydroNode::ExternalInput {
697                    from_external_id,
698                    from_key,
699                    from_many,
700                    codec_type,
701                    port_hint,
702                    instantiate_fn,
703                    metadata,
704                    ..
705                } = n
706                {
707                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
708                        DebugInstantiate::Building => {
709                            let from_node = externals
710                                .get(from_external_id)
711                                .unwrap_or_else(|| {
712                                    panic!(
713                                        "A external used in the graph was not instantiated: {}",
714                                        from_external_id
715                                    )
716                                })
717                                .clone();
718
719                            match metadata.location_kind.root() {
720                                LocationId::Process(process_id) => {
721                                    let to_node = processes
722                                        .get(process_id)
723                                        .unwrap_or_else(|| {
724                                            panic!("A process used in the graph was not instantiated: {}", process_id)
725                                        })
726                                        .clone();
727
728                                    let sink_port = D::allocate_external_port(&from_node);
729                                    let source_port = D::allocate_process_port(&to_node);
730
731                                    from_node.register(*from_key, sink_port.clone());
732
733                                    (
734                                        (
735                                            parse_quote!(DUMMY),
736                                            if *from_many {
737                                                D::e2o_many_source(
738                                                    compile_env,
739                                                    extra_stmts.entry(*process_id).or_default(),
740                                                    &to_node, &source_port,
741                                                    codec_type.0.as_ref(),
742                                                    format!("{}_{}", *from_external_id, *from_key)
743                                                )
744                                            } else {
745                                                D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port)
746                                            },
747                                        ),
748                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
749                                    )
750                                }
751                                LocationId::Cluster(_) => todo!(),
752                                _ => panic!()
753                            }
754                        },
755
756                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
757                    };
758
759                    *instantiate_fn = DebugInstantiateFinalized {
760                        sink: sink_expr,
761                        source: source_expr,
762                        connect_fn: Some(connect_fn),
763                    }
764                    .into();
765                }
766            },
767            seen_tees,
768            false,
769        );
770    }
771
772    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
773        self.transform_bottom_up(
774            &mut |l| {
775                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
776                    match instantiate_fn {
777                        DebugInstantiate::Building => panic!("network not built"),
778
779                        DebugInstantiate::Finalized(finalized) => {
780                            (finalized.connect_fn.take().unwrap())();
781                        }
782                    }
783                }
784            },
785            &mut |n| {
786                if let HydroNode::Network { instantiate_fn, .. }
787                | HydroNode::ExternalInput { instantiate_fn, .. } = n
788                {
789                    match instantiate_fn {
790                        DebugInstantiate::Building => panic!("network not built"),
791
792                        DebugInstantiate::Finalized(finalized) => {
793                            (finalized.connect_fn.take().unwrap())();
794                        }
795                    }
796                }
797            },
798            seen_tees,
799            false,
800        );
801    }
802
803    pub fn transform_bottom_up(
804        &mut self,
805        transform_root: &mut impl FnMut(&mut HydroRoot),
806        transform_node: &mut impl FnMut(&mut HydroNode),
807        seen_tees: &mut SeenTees,
808        check_well_formed: bool,
809    ) {
810        self.transform_children(
811            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
812            seen_tees,
813        );
814
815        transform_root(self);
816    }
817
818    pub fn transform_children(
819        &mut self,
820        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
821        seen_tees: &mut SeenTees,
822    ) {
823        match self {
824            HydroRoot::ForEach { input, .. }
825            | HydroRoot::SendExternal { input, .. }
826            | HydroRoot::DestSink { input, .. }
827            | HydroRoot::CycleSink { input, .. } => {
828                transform(input, seen_tees);
829            }
830        }
831    }
832
833    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
834        match self {
835            HydroRoot::ForEach {
836                f,
837                input,
838                op_metadata,
839            } => HydroRoot::ForEach {
840                f: f.clone(),
841                input: Box::new(input.deep_clone(seen_tees)),
842                op_metadata: op_metadata.clone(),
843            },
844            HydroRoot::SendExternal {
845                to_external_id,
846                to_key,
847                to_many,
848                serialize_fn,
849                instantiate_fn,
850                input,
851                op_metadata,
852            } => HydroRoot::SendExternal {
853                to_external_id: *to_external_id,
854                to_key: *to_key,
855                to_many: *to_many,
856                serialize_fn: serialize_fn.clone(),
857                instantiate_fn: instantiate_fn.clone(),
858                input: Box::new(input.deep_clone(seen_tees)),
859                op_metadata: op_metadata.clone(),
860            },
861            HydroRoot::DestSink {
862                sink,
863                input,
864                op_metadata,
865            } => HydroRoot::DestSink {
866                sink: sink.clone(),
867                input: Box::new(input.deep_clone(seen_tees)),
868                op_metadata: op_metadata.clone(),
869            },
870            HydroRoot::CycleSink {
871                ident,
872                input,
873                op_metadata,
874            } => HydroRoot::CycleSink {
875                ident: ident.clone(),
876                input: Box::new(input.deep_clone(seen_tees)),
877                op_metadata: op_metadata.clone(),
878            },
879        }
880    }
881
882    #[cfg(feature = "build")]
883    pub fn emit(
884        &mut self,
885        graph_builders: &mut dyn DfirBuilder,
886        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
887        next_stmt_id: &mut usize,
888    ) {
889        self.emit_core(
890            &mut BuildersOrCallback::Builders::<
891                fn(&mut HydroRoot, &mut usize),
892                fn(&mut HydroNode, &mut usize),
893            >(graph_builders),
894            built_tees,
895            next_stmt_id,
896        );
897    }
898
899    #[cfg(feature = "build")]
900    pub fn emit_core(
901        &mut self,
902        builders_or_callback: &mut BuildersOrCallback<
903            impl FnMut(&mut HydroRoot, &mut usize),
904            impl FnMut(&mut HydroNode, &mut usize),
905        >,
906        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
907        next_stmt_id: &mut usize,
908    ) {
909        match self {
910            HydroRoot::ForEach { f, input, .. } => {
911                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
912
913                match builders_or_callback {
914                    BuildersOrCallback::Builders(graph_builders) => {
915                        graph_builders
916                            .get_dfir_mut(&input.metadata().location_kind)
917                            .add_dfir(
918                                parse_quote! {
919                                    #input_ident -> for_each(#f);
920                                },
921                                None,
922                                Some(&next_stmt_id.to_string()),
923                            );
924                    }
925                    BuildersOrCallback::Callback(leaf_callback, _) => {
926                        leaf_callback(self, next_stmt_id);
927                    }
928                }
929
930                *next_stmt_id += 1;
931            }
932
933            HydroRoot::SendExternal {
934                serialize_fn,
935                instantiate_fn,
936                input,
937                ..
938            } => {
939                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
940
941                match builders_or_callback {
942                    BuildersOrCallback::Builders(graph_builders) => {
943                        let (sink_expr, _) = match instantiate_fn {
944                            DebugInstantiate::Building => (
945                                syn::parse_quote!(DUMMY_SINK),
946                                syn::parse_quote!(DUMMY_SOURCE),
947                            ),
948
949                            DebugInstantiate::Finalized(finalized) => {
950                                (finalized.sink.clone(), finalized.source.clone())
951                            }
952                        };
953
954                        graph_builders.create_external_output(
955                            &input.metadata().location_kind,
956                            sink_expr,
957                            &input_ident,
958                            serialize_fn,
959                            *next_stmt_id,
960                        );
961                    }
962                    BuildersOrCallback::Callback(leaf_callback, _) => {
963                        leaf_callback(self, next_stmt_id);
964                    }
965                }
966
967                *next_stmt_id += 1;
968            }
969
970            HydroRoot::DestSink { sink, input, .. } => {
971                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
972
973                match builders_or_callback {
974                    BuildersOrCallback::Builders(graph_builders) => {
975                        graph_builders
976                            .get_dfir_mut(&input.metadata().location_kind)
977                            .add_dfir(
978                                parse_quote! {
979                                    #input_ident -> dest_sink(#sink);
980                                },
981                                None,
982                                Some(&next_stmt_id.to_string()),
983                            );
984                    }
985                    BuildersOrCallback::Callback(leaf_callback, _) => {
986                        leaf_callback(self, next_stmt_id);
987                    }
988                }
989
990                *next_stmt_id += 1;
991            }
992
993            HydroRoot::CycleSink { ident, input, .. } => {
994                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
995
996                match builders_or_callback {
997                    BuildersOrCallback::Builders(graph_builders) => {
998                        graph_builders
999                            .get_dfir_mut(&input.metadata().location_kind)
1000                            .add_dfir(
1001                                parse_quote! {
1002                                    #ident = #input_ident;
1003                                },
1004                                None,
1005                                None,
1006                            );
1007                    }
1008                    // No ID, no callback
1009                    BuildersOrCallback::Callback(_, _) => {}
1010                }
1011            }
1012        }
1013    }
1014
1015    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1016        match self {
1017            HydroRoot::ForEach { op_metadata, .. }
1018            | HydroRoot::SendExternal { op_metadata, .. }
1019            | HydroRoot::DestSink { op_metadata, .. }
1020            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1021        }
1022    }
1023
1024    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1025        match self {
1026            HydroRoot::ForEach { op_metadata, .. }
1027            | HydroRoot::SendExternal { op_metadata, .. }
1028            | HydroRoot::DestSink { op_metadata, .. }
1029            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1030        }
1031    }
1032
1033    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
1034        match self {
1035            HydroRoot::ForEach { input, .. }
1036            | HydroRoot::SendExternal { input, .. }
1037            | HydroRoot::DestSink { input, .. }
1038            | HydroRoot::CycleSink { input, .. } => {
1039                vec![input.metadata()]
1040            }
1041        }
1042    }
1043
1044    pub fn print_root(&self) -> String {
1045        match self {
1046            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1047            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1048            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1049            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1050        }
1051    }
1052
1053    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1054        match self {
1055            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1056                transform(f);
1057            }
1058            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1059        }
1060    }
1061}
1062
1063#[cfg(feature = "build")]
1064pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1065    let mut builders = BTreeMap::new();
1066    let mut built_tees = HashMap::new();
1067    let mut next_stmt_id = 0;
1068    for leaf in ir {
1069        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1070    }
1071    builders
1072}
1073
1074#[cfg(feature = "build")]
1075pub fn traverse_dfir(
1076    ir: &mut [HydroRoot],
1077    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1078    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1079) {
1080    let mut seen_tees = HashMap::new();
1081    let mut next_stmt_id = 0;
1082    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1083    ir.iter_mut().for_each(|leaf| {
1084        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1085    });
1086}
1087
1088pub fn transform_bottom_up(
1089    ir: &mut [HydroRoot],
1090    transform_root: &mut impl FnMut(&mut HydroRoot),
1091    transform_node: &mut impl FnMut(&mut HydroNode),
1092    check_well_formed: bool,
1093) {
1094    let mut seen_tees = HashMap::new();
1095    ir.iter_mut().for_each(|leaf| {
1096        leaf.transform_bottom_up(
1097            transform_root,
1098            transform_node,
1099            &mut seen_tees,
1100            check_well_formed,
1101        );
1102    });
1103}
1104
1105pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1106    let mut seen_tees = HashMap::new();
1107    ir.iter()
1108        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1109        .collect()
1110}
1111
1112type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1113thread_local! {
1114    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1115}
1116
1117pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1118    PRINTED_TEES.with(|printed_tees| {
1119        let mut printed_tees_mut = printed_tees.borrow_mut();
1120        *printed_tees_mut = Some((0, HashMap::new()));
1121        drop(printed_tees_mut);
1122
1123        let ret = f();
1124
1125        let mut printed_tees_mut = printed_tees.borrow_mut();
1126        *printed_tees_mut = None;
1127
1128        ret
1129    })
1130}
1131
1132pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1133
1134impl TeeNode {
1135    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1136        Rc::as_ptr(&self.0)
1137    }
1138}
1139
1140impl Debug for TeeNode {
1141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1142        PRINTED_TEES.with(|printed_tees| {
1143            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1144            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1145
1146            if let Some(printed_tees_mut) = printed_tees_mut {
1147                if let Some(existing) = printed_tees_mut
1148                    .1
1149                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1150                {
1151                    write!(f, "<tee {}>", existing)
1152                } else {
1153                    let next_id = printed_tees_mut.0;
1154                    printed_tees_mut.0 += 1;
1155                    printed_tees_mut
1156                        .1
1157                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1158                    drop(printed_tees_mut_borrow);
1159                    write!(f, "<tee {}>: ", next_id)?;
1160                    Debug::fmt(&self.0.borrow(), f)
1161                }
1162            } else {
1163                drop(printed_tees_mut_borrow);
1164                write!(f, "<tee>: ")?;
1165                Debug::fmt(&self.0.borrow(), f)
1166            }
1167        })
1168    }
1169}
1170
1171impl Hash for TeeNode {
1172    fn hash<H: Hasher>(&self, state: &mut H) {
1173        self.0.borrow_mut().hash(state);
1174    }
1175}
1176
1177#[derive(Clone, PartialEq, Eq, Debug)]
1178pub enum BoundKind {
1179    Unbounded,
1180    Bounded,
1181}
1182
1183#[derive(Clone, PartialEq, Eq, Debug)]
1184pub enum StreamOrder {
1185    NoOrder,
1186    TotalOrder,
1187}
1188
1189#[derive(Clone, PartialEq, Eq, Debug)]
1190pub enum StreamRetry {
1191    AtLeastOnce,
1192    ExactlyOnce,
1193}
1194
1195#[derive(Clone, PartialEq, Eq, Debug)]
1196pub enum KeyedSingletonBoundKind {
1197    Unbounded,
1198    BoundedValue,
1199    Bounded,
1200}
1201
1202#[derive(Clone, PartialEq, Eq, Debug)]
1203pub enum CollectionKind {
1204    Stream {
1205        bound: BoundKind,
1206        order: StreamOrder,
1207        retry: StreamRetry,
1208        element_type: DebugType,
1209    },
1210    Singleton {
1211        bound: BoundKind,
1212        element_type: DebugType,
1213    },
1214    Optional {
1215        bound: BoundKind,
1216        element_type: DebugType,
1217    },
1218    KeyedStream {
1219        bound: BoundKind,
1220        value_order: StreamOrder,
1221        value_retry: StreamRetry,
1222        key_type: DebugType,
1223        value_type: DebugType,
1224    },
1225    KeyedSingleton {
1226        bound: KeyedSingletonBoundKind,
1227        key_type: DebugType,
1228        value_type: DebugType,
1229    },
1230}
1231
1232#[derive(Clone)]
1233pub struct HydroIrMetadata {
1234    pub location_kind: LocationId,
1235    pub collection_kind: CollectionKind,
1236    pub cardinality: Option<usize>,
1237    pub tag: Option<String>,
1238    pub op: HydroIrOpMetadata,
1239}
1240
1241// HydroIrMetadata shouldn't be used to hash or compare
1242impl Hash for HydroIrMetadata {
1243    fn hash<H: Hasher>(&self, _: &mut H) {}
1244}
1245
1246impl PartialEq for HydroIrMetadata {
1247    fn eq(&self, _: &Self) -> bool {
1248        true
1249    }
1250}
1251
1252impl Eq for HydroIrMetadata {}
1253
1254impl Debug for HydroIrMetadata {
1255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1256        f.debug_struct("HydroIrMetadata")
1257            .field("location_kind", &self.location_kind)
1258            .field("collection_kind", &self.collection_kind)
1259            .finish()
1260    }
1261}
1262
1263/// Metadata that is specific to the operator itself, rather than its outputs.
1264/// This is available on _both_ inner nodes and roots.
1265#[derive(Clone)]
1266pub struct HydroIrOpMetadata {
1267    pub backtrace: Backtrace,
1268    pub cpu_usage: Option<f64>,
1269    pub network_recv_cpu_usage: Option<f64>,
1270    pub id: Option<usize>,
1271}
1272
1273impl HydroIrOpMetadata {
1274    #[expect(
1275        clippy::new_without_default,
1276        reason = "explicit calls to new ensure correct backtrace bounds"
1277    )]
1278    pub fn new() -> HydroIrOpMetadata {
1279        Self::new_with_skip(1)
1280    }
1281
1282    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1283        HydroIrOpMetadata {
1284            backtrace: Backtrace::get_backtrace(2 + skip_count),
1285            cpu_usage: None,
1286            network_recv_cpu_usage: None,
1287            id: None,
1288        }
1289    }
1290}
1291
1292impl Debug for HydroIrOpMetadata {
1293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294        f.debug_struct("HydroIrOpMetadata").finish()
1295    }
1296}
1297
1298impl Hash for HydroIrOpMetadata {
1299    fn hash<H: Hasher>(&self, _: &mut H) {}
1300}
1301
1302/// An intermediate node in a Hydro graph, which consumes data
1303/// from upstream nodes and emits data to downstream nodes.
1304#[derive(Debug, Hash)]
1305pub enum HydroNode {
1306    Placeholder,
1307
1308    /// Manually "casts" between two different collection kinds.
1309    ///
1310    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1311    /// correctness checks. In particular, the user must ensure that every possible
1312    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1313    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1314    /// collection. This ensures that the simulator does not miss any possible outputs.
1315    Cast {
1316        inner: Box<HydroNode>,
1317        metadata: HydroIrMetadata,
1318    },
1319
1320    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1321    /// interpretation of the input stream.
1322    ///
1323    /// In production, this simply passes through the input, but in simulation, this operator
1324    /// explicitly selects a randomized interpretation.
1325    ObserveNonDet {
1326        inner: Box<HydroNode>,
1327        metadata: HydroIrMetadata,
1328    },
1329
1330    Source {
1331        source: HydroSource,
1332        metadata: HydroIrMetadata,
1333    },
1334
1335    CycleSource {
1336        ident: syn::Ident,
1337        metadata: HydroIrMetadata,
1338    },
1339
1340    Tee {
1341        inner: TeeNode,
1342        metadata: HydroIrMetadata,
1343    },
1344
1345    Persist {
1346        inner: Box<HydroNode>,
1347        metadata: HydroIrMetadata,
1348    },
1349
1350    BeginAtomic {
1351        inner: Box<HydroNode>,
1352        metadata: HydroIrMetadata,
1353    },
1354
1355    EndAtomic {
1356        inner: Box<HydroNode>,
1357        metadata: HydroIrMetadata,
1358    },
1359
1360    Batch {
1361        inner: Box<HydroNode>,
1362        metadata: HydroIrMetadata,
1363    },
1364
1365    YieldConcat {
1366        inner: Box<HydroNode>,
1367        metadata: HydroIrMetadata,
1368    },
1369
1370    Chain {
1371        first: Box<HydroNode>,
1372        second: Box<HydroNode>,
1373        metadata: HydroIrMetadata,
1374    },
1375
1376    ChainFirst {
1377        first: Box<HydroNode>,
1378        second: Box<HydroNode>,
1379        metadata: HydroIrMetadata,
1380    },
1381
1382    CrossProduct {
1383        left: Box<HydroNode>,
1384        right: Box<HydroNode>,
1385        metadata: HydroIrMetadata,
1386    },
1387
1388    CrossSingleton {
1389        left: Box<HydroNode>,
1390        right: Box<HydroNode>,
1391        metadata: HydroIrMetadata,
1392    },
1393
1394    Join {
1395        left: Box<HydroNode>,
1396        right: Box<HydroNode>,
1397        metadata: HydroIrMetadata,
1398    },
1399
1400    Difference {
1401        pos: Box<HydroNode>,
1402        neg: Box<HydroNode>,
1403        metadata: HydroIrMetadata,
1404    },
1405
1406    AntiJoin {
1407        pos: Box<HydroNode>,
1408        neg: Box<HydroNode>,
1409        metadata: HydroIrMetadata,
1410    },
1411
1412    ResolveFutures {
1413        input: Box<HydroNode>,
1414        metadata: HydroIrMetadata,
1415    },
1416    ResolveFuturesOrdered {
1417        input: Box<HydroNode>,
1418        metadata: HydroIrMetadata,
1419    },
1420
1421    Map {
1422        f: DebugExpr,
1423        input: Box<HydroNode>,
1424        metadata: HydroIrMetadata,
1425    },
1426    FlatMap {
1427        f: DebugExpr,
1428        input: Box<HydroNode>,
1429        metadata: HydroIrMetadata,
1430    },
1431    Filter {
1432        f: DebugExpr,
1433        input: Box<HydroNode>,
1434        metadata: HydroIrMetadata,
1435    },
1436    FilterMap {
1437        f: DebugExpr,
1438        input: Box<HydroNode>,
1439        metadata: HydroIrMetadata,
1440    },
1441
1442    DeferTick {
1443        input: Box<HydroNode>,
1444        metadata: HydroIrMetadata,
1445    },
1446    Enumerate {
1447        input: Box<HydroNode>,
1448        metadata: HydroIrMetadata,
1449    },
1450    Inspect {
1451        f: DebugExpr,
1452        input: Box<HydroNode>,
1453        metadata: HydroIrMetadata,
1454    },
1455
1456    Unique {
1457        input: Box<HydroNode>,
1458        metadata: HydroIrMetadata,
1459    },
1460
1461    Sort {
1462        input: Box<HydroNode>,
1463        metadata: HydroIrMetadata,
1464    },
1465    Fold {
1466        init: DebugExpr,
1467        acc: DebugExpr,
1468        input: Box<HydroNode>,
1469        metadata: HydroIrMetadata,
1470    },
1471
1472    Scan {
1473        init: DebugExpr,
1474        acc: DebugExpr,
1475        input: Box<HydroNode>,
1476        metadata: HydroIrMetadata,
1477    },
1478    FoldKeyed {
1479        init: DebugExpr,
1480        acc: DebugExpr,
1481        input: Box<HydroNode>,
1482        metadata: HydroIrMetadata,
1483    },
1484
1485    Reduce {
1486        f: DebugExpr,
1487        input: Box<HydroNode>,
1488        metadata: HydroIrMetadata,
1489    },
1490    ReduceKeyed {
1491        f: DebugExpr,
1492        input: Box<HydroNode>,
1493        metadata: HydroIrMetadata,
1494    },
1495    ReduceKeyedWatermark {
1496        f: DebugExpr,
1497        input: Box<HydroNode>,
1498        watermark: Box<HydroNode>,
1499        metadata: HydroIrMetadata,
1500    },
1501
1502    Network {
1503        serialize_fn: Option<DebugExpr>,
1504        instantiate_fn: DebugInstantiate,
1505        deserialize_fn: Option<DebugExpr>,
1506        input: Box<HydroNode>,
1507        metadata: HydroIrMetadata,
1508    },
1509
1510    ExternalInput {
1511        from_external_id: usize,
1512        from_key: usize,
1513        from_many: bool,
1514        codec_type: DebugType,
1515        port_hint: NetworkHint,
1516        instantiate_fn: DebugInstantiate,
1517        deserialize_fn: Option<DebugExpr>,
1518        metadata: HydroIrMetadata,
1519    },
1520
1521    Counter {
1522        tag: String,
1523        duration: DebugExpr,
1524        prefix: String,
1525        input: Box<HydroNode>,
1526        metadata: HydroIrMetadata,
1527    },
1528}
1529
1530pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1531pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1532
1533impl HydroNode {
1534    pub fn transform_bottom_up(
1535        &mut self,
1536        transform: &mut impl FnMut(&mut HydroNode),
1537        seen_tees: &mut SeenTees,
1538        check_well_formed: bool,
1539    ) {
1540        self.transform_children(
1541            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1542            seen_tees,
1543        );
1544
1545        transform(self);
1546
1547        let self_location = self.metadata().location_kind.root();
1548
1549        if check_well_formed {
1550            match &*self {
1551                HydroNode::Network { .. } => {}
1552                _ => {
1553                    self.input_metadata().iter().for_each(|i| {
1554                        if i.location_kind.root() != self_location {
1555                            panic!(
1556                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1557                                i,
1558                                i.location_kind.root(),
1559                                self,
1560                                self_location
1561                            )
1562                        }
1563                    });
1564                }
1565            }
1566        }
1567    }
1568
1569    #[inline(always)]
1570    pub fn transform_children(
1571        &mut self,
1572        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1573        seen_tees: &mut SeenTees,
1574    ) {
1575        match self {
1576            HydroNode::Placeholder => {
1577                panic!();
1578            }
1579
1580            HydroNode::Source { .. }
1581            | HydroNode::CycleSource { .. }
1582            | HydroNode::ExternalInput { .. } => {}
1583
1584            HydroNode::Tee { inner, .. } => {
1585                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1586                    *inner = TeeNode(transformed.clone());
1587                } else {
1588                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1589                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1590                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1591                    transform(&mut orig, seen_tees);
1592                    *transformed_cell.borrow_mut() = orig;
1593                    *inner = TeeNode(transformed_cell);
1594                }
1595            }
1596
1597            HydroNode::Cast { inner, .. }
1598            | HydroNode::ObserveNonDet { inner, .. }
1599            | HydroNode::Persist { inner, .. }
1600            | HydroNode::BeginAtomic { inner, .. }
1601            | HydroNode::EndAtomic { inner, .. }
1602            | HydroNode::Batch { inner, .. }
1603            | HydroNode::YieldConcat { inner, .. } => {
1604                transform(inner.as_mut(), seen_tees);
1605            }
1606
1607            HydroNode::Chain { first, second, .. } => {
1608                transform(first.as_mut(), seen_tees);
1609                transform(second.as_mut(), seen_tees);
1610            }
1611
1612            HydroNode::ChainFirst { first, second, .. } => {
1613                transform(first.as_mut(), seen_tees);
1614                transform(second.as_mut(), seen_tees);
1615            }
1616
1617            HydroNode::CrossSingleton { left, right, .. }
1618            | HydroNode::CrossProduct { left, right, .. }
1619            | HydroNode::Join { left, right, .. } => {
1620                transform(left.as_mut(), seen_tees);
1621                transform(right.as_mut(), seen_tees);
1622            }
1623
1624            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1625                transform(pos.as_mut(), seen_tees);
1626                transform(neg.as_mut(), seen_tees);
1627            }
1628
1629            HydroNode::ReduceKeyedWatermark {
1630                input, watermark, ..
1631            } => {
1632                transform(input.as_mut(), seen_tees);
1633                transform(watermark.as_mut(), seen_tees);
1634            }
1635
1636            HydroNode::Map { input, .. }
1637            | HydroNode::ResolveFutures { input, .. }
1638            | HydroNode::ResolveFuturesOrdered { input, .. }
1639            | HydroNode::FlatMap { input, .. }
1640            | HydroNode::Filter { input, .. }
1641            | HydroNode::FilterMap { input, .. }
1642            | HydroNode::Sort { input, .. }
1643            | HydroNode::DeferTick { input, .. }
1644            | HydroNode::Enumerate { input, .. }
1645            | HydroNode::Inspect { input, .. }
1646            | HydroNode::Unique { input, .. }
1647            | HydroNode::Network { input, .. }
1648            | HydroNode::Fold { input, .. }
1649            | HydroNode::Scan { input, .. }
1650            | HydroNode::FoldKeyed { input, .. }
1651            | HydroNode::Reduce { input, .. }
1652            | HydroNode::ReduceKeyed { input, .. }
1653            | HydroNode::Counter { input, .. } => {
1654                transform(input.as_mut(), seen_tees);
1655            }
1656        }
1657    }
1658
1659    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1660        match self {
1661            HydroNode::Placeholder => HydroNode::Placeholder,
1662            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1663                inner: Box::new(inner.deep_clone(seen_tees)),
1664                metadata: metadata.clone(),
1665            },
1666            HydroNode::ObserveNonDet { inner, metadata } => HydroNode::ObserveNonDet {
1667                inner: Box::new(inner.deep_clone(seen_tees)),
1668                metadata: metadata.clone(),
1669            },
1670            HydroNode::Source { source, metadata } => HydroNode::Source {
1671                source: source.clone(),
1672                metadata: metadata.clone(),
1673            },
1674            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1675                ident: ident.clone(),
1676                metadata: metadata.clone(),
1677            },
1678            HydroNode::Tee { inner, metadata } => {
1679                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1680                    HydroNode::Tee {
1681                        inner: TeeNode(transformed.clone()),
1682                        metadata: metadata.clone(),
1683                    }
1684                } else {
1685                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1686                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1687                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1688                    *new_rc.borrow_mut() = cloned;
1689                    HydroNode::Tee {
1690                        inner: TeeNode(new_rc),
1691                        metadata: metadata.clone(),
1692                    }
1693                }
1694            }
1695            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1696                inner: Box::new(inner.deep_clone(seen_tees)),
1697                metadata: metadata.clone(),
1698            },
1699            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1700                inner: Box::new(inner.deep_clone(seen_tees)),
1701                metadata: metadata.clone(),
1702            },
1703            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1704                inner: Box::new(inner.deep_clone(seen_tees)),
1705                metadata: metadata.clone(),
1706            },
1707            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1708                inner: Box::new(inner.deep_clone(seen_tees)),
1709                metadata: metadata.clone(),
1710            },
1711            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1712                inner: Box::new(inner.deep_clone(seen_tees)),
1713                metadata: metadata.clone(),
1714            },
1715            HydroNode::Chain {
1716                first,
1717                second,
1718                metadata,
1719            } => HydroNode::Chain {
1720                first: Box::new(first.deep_clone(seen_tees)),
1721                second: Box::new(second.deep_clone(seen_tees)),
1722                metadata: metadata.clone(),
1723            },
1724            HydroNode::ChainFirst {
1725                first,
1726                second,
1727                metadata,
1728            } => HydroNode::ChainFirst {
1729                first: Box::new(first.deep_clone(seen_tees)),
1730                second: Box::new(second.deep_clone(seen_tees)),
1731                metadata: metadata.clone(),
1732            },
1733            HydroNode::CrossProduct {
1734                left,
1735                right,
1736                metadata,
1737            } => HydroNode::CrossProduct {
1738                left: Box::new(left.deep_clone(seen_tees)),
1739                right: Box::new(right.deep_clone(seen_tees)),
1740                metadata: metadata.clone(),
1741            },
1742            HydroNode::CrossSingleton {
1743                left,
1744                right,
1745                metadata,
1746            } => HydroNode::CrossSingleton {
1747                left: Box::new(left.deep_clone(seen_tees)),
1748                right: Box::new(right.deep_clone(seen_tees)),
1749                metadata: metadata.clone(),
1750            },
1751            HydroNode::Join {
1752                left,
1753                right,
1754                metadata,
1755            } => HydroNode::Join {
1756                left: Box::new(left.deep_clone(seen_tees)),
1757                right: Box::new(right.deep_clone(seen_tees)),
1758                metadata: metadata.clone(),
1759            },
1760            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1761                pos: Box::new(pos.deep_clone(seen_tees)),
1762                neg: Box::new(neg.deep_clone(seen_tees)),
1763                metadata: metadata.clone(),
1764            },
1765            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1766                pos: Box::new(pos.deep_clone(seen_tees)),
1767                neg: Box::new(neg.deep_clone(seen_tees)),
1768                metadata: metadata.clone(),
1769            },
1770            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1771                input: Box::new(input.deep_clone(seen_tees)),
1772                metadata: metadata.clone(),
1773            },
1774            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1775                HydroNode::ResolveFuturesOrdered {
1776                    input: Box::new(input.deep_clone(seen_tees)),
1777                    metadata: metadata.clone(),
1778                }
1779            }
1780            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1781                f: f.clone(),
1782                input: Box::new(input.deep_clone(seen_tees)),
1783                metadata: metadata.clone(),
1784            },
1785            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1786                f: f.clone(),
1787                input: Box::new(input.deep_clone(seen_tees)),
1788                metadata: metadata.clone(),
1789            },
1790            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1791                f: f.clone(),
1792                input: Box::new(input.deep_clone(seen_tees)),
1793                metadata: metadata.clone(),
1794            },
1795            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1796                f: f.clone(),
1797                input: Box::new(input.deep_clone(seen_tees)),
1798                metadata: metadata.clone(),
1799            },
1800            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1801                input: Box::new(input.deep_clone(seen_tees)),
1802                metadata: metadata.clone(),
1803            },
1804            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1805                input: Box::new(input.deep_clone(seen_tees)),
1806                metadata: metadata.clone(),
1807            },
1808            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1809                f: f.clone(),
1810                input: Box::new(input.deep_clone(seen_tees)),
1811                metadata: metadata.clone(),
1812            },
1813            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1814                input: Box::new(input.deep_clone(seen_tees)),
1815                metadata: metadata.clone(),
1816            },
1817            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1818                input: Box::new(input.deep_clone(seen_tees)),
1819                metadata: metadata.clone(),
1820            },
1821            HydroNode::Fold {
1822                init,
1823                acc,
1824                input,
1825                metadata,
1826            } => HydroNode::Fold {
1827                init: init.clone(),
1828                acc: acc.clone(),
1829                input: Box::new(input.deep_clone(seen_tees)),
1830                metadata: metadata.clone(),
1831            },
1832            HydroNode::Scan {
1833                init,
1834                acc,
1835                input,
1836                metadata,
1837            } => HydroNode::Scan {
1838                init: init.clone(),
1839                acc: acc.clone(),
1840                input: Box::new(input.deep_clone(seen_tees)),
1841                metadata: metadata.clone(),
1842            },
1843            HydroNode::FoldKeyed {
1844                init,
1845                acc,
1846                input,
1847                metadata,
1848            } => HydroNode::FoldKeyed {
1849                init: init.clone(),
1850                acc: acc.clone(),
1851                input: Box::new(input.deep_clone(seen_tees)),
1852                metadata: metadata.clone(),
1853            },
1854            HydroNode::ReduceKeyedWatermark {
1855                f,
1856                input,
1857                watermark,
1858                metadata,
1859            } => HydroNode::ReduceKeyedWatermark {
1860                f: f.clone(),
1861                input: Box::new(input.deep_clone(seen_tees)),
1862                watermark: Box::new(watermark.deep_clone(seen_tees)),
1863                metadata: metadata.clone(),
1864            },
1865            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1866                f: f.clone(),
1867                input: Box::new(input.deep_clone(seen_tees)),
1868                metadata: metadata.clone(),
1869            },
1870            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1871                f: f.clone(),
1872                input: Box::new(input.deep_clone(seen_tees)),
1873                metadata: metadata.clone(),
1874            },
1875            HydroNode::Network {
1876                serialize_fn,
1877                instantiate_fn,
1878                deserialize_fn,
1879                input,
1880                metadata,
1881            } => HydroNode::Network {
1882                serialize_fn: serialize_fn.clone(),
1883                instantiate_fn: instantiate_fn.clone(),
1884                deserialize_fn: deserialize_fn.clone(),
1885                input: Box::new(input.deep_clone(seen_tees)),
1886                metadata: metadata.clone(),
1887            },
1888            HydroNode::ExternalInput {
1889                from_external_id,
1890                from_key,
1891                from_many,
1892                codec_type,
1893                port_hint,
1894                instantiate_fn,
1895                deserialize_fn,
1896                metadata,
1897            } => HydroNode::ExternalInput {
1898                from_external_id: *from_external_id,
1899                from_key: *from_key,
1900                from_many: *from_many,
1901                codec_type: codec_type.clone(),
1902                port_hint: *port_hint,
1903                instantiate_fn: instantiate_fn.clone(),
1904                deserialize_fn: deserialize_fn.clone(),
1905                metadata: metadata.clone(),
1906            },
1907            HydroNode::Counter {
1908                tag,
1909                duration,
1910                prefix,
1911                input,
1912                metadata,
1913            } => HydroNode::Counter {
1914                tag: tag.clone(),
1915                duration: duration.clone(),
1916                prefix: prefix.clone(),
1917                input: Box::new(input.deep_clone(seen_tees)),
1918                metadata: metadata.clone(),
1919            },
1920        }
1921    }
1922
1923    #[cfg(feature = "build")]
1924    pub fn emit_core(
1925        &mut self,
1926        builders_or_callback: &mut BuildersOrCallback<
1927            impl FnMut(&mut HydroRoot, &mut usize),
1928            impl FnMut(&mut HydroNode, &mut usize),
1929        >,
1930        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1931        next_stmt_id: &mut usize,
1932    ) -> syn::Ident {
1933        let out_location = self.metadata().location_kind.clone();
1934        match self {
1935            HydroNode::Placeholder => {
1936                panic!()
1937            }
1938
1939            HydroNode::Cast { inner, .. } => {
1940                inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
1941            }
1942
1943            HydroNode::ObserveNonDet {
1944                inner, metadata, ..
1945            } => {
1946                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1947
1948                let observe_ident =
1949                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1950
1951                match builders_or_callback {
1952                    BuildersOrCallback::Builders(graph_builders) => {
1953                        graph_builders.observe_nondet(
1954                            &inner.metadata().location_kind,
1955                            inner_ident,
1956                            &inner.metadata().collection_kind,
1957                            &observe_ident,
1958                            &metadata.collection_kind,
1959                        );
1960                    }
1961                    BuildersOrCallback::Callback(_, node_callback) => {
1962                        node_callback(self, next_stmt_id);
1963                    }
1964                }
1965
1966                *next_stmt_id += 1;
1967
1968                observe_ident
1969            }
1970
1971            HydroNode::Persist { inner, .. } => {
1972                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1973
1974                let persist_ident =
1975                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1976
1977                match builders_or_callback {
1978                    BuildersOrCallback::Builders(graph_builders) => {
1979                        let builder = graph_builders.get_dfir_mut(&out_location);
1980                        builder.add_dfir(
1981                            parse_quote! {
1982                                #persist_ident = #inner_ident -> persist::<'static>();
1983                            },
1984                            None,
1985                            Some(&next_stmt_id.to_string()),
1986                        );
1987                    }
1988                    BuildersOrCallback::Callback(_, node_callback) => {
1989                        node_callback(self, next_stmt_id);
1990                    }
1991                }
1992
1993                *next_stmt_id += 1;
1994
1995                persist_ident
1996            }
1997
1998            HydroNode::Batch {
1999                inner, metadata, ..
2000            } => {
2001                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2002
2003                let batch_ident =
2004                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2005
2006                match builders_or_callback {
2007                    BuildersOrCallback::Builders(graph_builders) => {
2008                        graph_builders.batch(
2009                            inner_ident,
2010                            &inner.metadata().location_kind,
2011                            &inner.metadata().collection_kind,
2012                            &batch_ident,
2013                            &out_location,
2014                            &metadata.op,
2015                        );
2016                    }
2017                    BuildersOrCallback::Callback(_, node_callback) => {
2018                        node_callback(self, next_stmt_id);
2019                    }
2020                }
2021
2022                *next_stmt_id += 1;
2023
2024                batch_ident
2025            }
2026
2027            HydroNode::YieldConcat { inner, .. } => {
2028                let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2029
2030                let yield_ident =
2031                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2032
2033                match builders_or_callback {
2034                    BuildersOrCallback::Builders(graph_builders) => {
2035                        graph_builders.yield_from_tick(
2036                            inner_ident,
2037                            &inner.metadata().location_kind,
2038                            &inner.metadata().collection_kind,
2039                            &yield_ident,
2040                        );
2041                    }
2042                    BuildersOrCallback::Callback(_, node_callback) => {
2043                        node_callback(self, next_stmt_id);
2044                    }
2045                }
2046
2047                *next_stmt_id += 1;
2048
2049                yield_ident
2050            }
2051
2052            HydroNode::BeginAtomic { inner, .. } => {
2053                inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
2054            }
2055
2056            HydroNode::EndAtomic { inner, .. } => {
2057                inner.emit_core(builders_or_callback, built_tees, next_stmt_id)
2058            }
2059
2060            HydroNode::Source {
2061                source, metadata, ..
2062            } => {
2063                if let HydroSource::ExternalNetwork() = source {
2064                    syn::Ident::new("DUMMY", Span::call_site())
2065                } else {
2066                    let source_ident =
2067                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2068
2069                    let source_stmt = match source {
2070                        HydroSource::Stream(expr) => {
2071                            debug_assert!(metadata.location_kind.is_top_level());
2072                            parse_quote! {
2073                                #source_ident = source_stream(#expr);
2074                            }
2075                        }
2076
2077                        HydroSource::ExternalNetwork() => {
2078                            unreachable!()
2079                        }
2080
2081                        HydroSource::Iter(expr) => {
2082                            if metadata.location_kind.is_top_level() {
2083                                parse_quote! {
2084                                    #source_ident = source_iter(#expr);
2085                                }
2086                            } else {
2087                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2088                                parse_quote! {
2089                                    #source_ident = source_iter(#expr) -> persist::<'static>();
2090                                }
2091                            }
2092                        }
2093
2094                        HydroSource::Spin() => {
2095                            debug_assert!(metadata.location_kind.is_top_level());
2096                            parse_quote! {
2097                                #source_ident = spin();
2098                            }
2099                        }
2100                    };
2101
2102                    match builders_or_callback {
2103                        BuildersOrCallback::Builders(graph_builders) => {
2104                            let builder = graph_builders.get_dfir_mut(&out_location);
2105                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2106                        }
2107                        BuildersOrCallback::Callback(_, node_callback) => {
2108                            node_callback(self, next_stmt_id);
2109                        }
2110                    }
2111
2112                    *next_stmt_id += 1;
2113
2114                    source_ident
2115                }
2116            }
2117
2118            HydroNode::CycleSource { ident, .. } => {
2119                let ident = ident.clone();
2120
2121                match builders_or_callback {
2122                    BuildersOrCallback::Builders(_) => {}
2123                    BuildersOrCallback::Callback(_, node_callback) => {
2124                        node_callback(self, next_stmt_id);
2125                    }
2126                }
2127
2128                // consume a stmt id even though we did not emit anything so that we can instrument this
2129                *next_stmt_id += 1;
2130
2131                ident
2132            }
2133
2134            HydroNode::Tee { inner, .. } => {
2135                let ret_ident = if let Some(teed_from) =
2136                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2137                {
2138                    match builders_or_callback {
2139                        BuildersOrCallback::Builders(_) => {}
2140                        BuildersOrCallback::Callback(_, node_callback) => {
2141                            node_callback(self, next_stmt_id);
2142                        }
2143                    }
2144
2145                    teed_from.clone()
2146                } else {
2147                    let inner_ident = inner.0.borrow_mut().emit_core(
2148                        builders_or_callback,
2149                        built_tees,
2150                        next_stmt_id,
2151                    );
2152
2153                    let tee_ident =
2154                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2155
2156                    built_tees.insert(
2157                        inner.0.as_ref() as *const RefCell<HydroNode>,
2158                        tee_ident.clone(),
2159                    );
2160
2161                    match builders_or_callback {
2162                        BuildersOrCallback::Builders(graph_builders) => {
2163                            let builder = graph_builders.get_dfir_mut(&out_location);
2164                            builder.add_dfir(
2165                                parse_quote! {
2166                                    #tee_ident = #inner_ident -> tee();
2167                                },
2168                                None,
2169                                Some(&next_stmt_id.to_string()),
2170                            );
2171                        }
2172                        BuildersOrCallback::Callback(_, node_callback) => {
2173                            node_callback(self, next_stmt_id);
2174                        }
2175                    }
2176
2177                    tee_ident
2178                };
2179
2180                // we consume a stmt id regardless of if we emit the tee() operator,
2181                // so that during rewrites we touch all recipients of the tee()
2182
2183                *next_stmt_id += 1;
2184                ret_ident
2185            }
2186
2187            HydroNode::Chain { first, second, .. } => {
2188                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2189                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2190
2191                let chain_ident =
2192                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2193
2194                match builders_or_callback {
2195                    BuildersOrCallback::Builders(graph_builders) => {
2196                        let builder = graph_builders.get_dfir_mut(&out_location);
2197                        builder.add_dfir(
2198                            parse_quote! {
2199                                #chain_ident = chain();
2200                                #first_ident -> [0]#chain_ident;
2201                                #second_ident -> [1]#chain_ident;
2202                            },
2203                            None,
2204                            Some(&next_stmt_id.to_string()),
2205                        );
2206                    }
2207                    BuildersOrCallback::Callback(_, node_callback) => {
2208                        node_callback(self, next_stmt_id);
2209                    }
2210                }
2211
2212                *next_stmt_id += 1;
2213
2214                chain_ident
2215            }
2216
2217            HydroNode::ChainFirst { first, second, .. } => {
2218                let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2219                let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2220
2221                let chain_ident =
2222                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2223
2224                match builders_or_callback {
2225                    BuildersOrCallback::Builders(graph_builders) => {
2226                        let builder = graph_builders.get_dfir_mut(&out_location);
2227                        builder.add_dfir(
2228                            parse_quote! {
2229                                #chain_ident = chain_first_n(1);
2230                                #first_ident -> [0]#chain_ident;
2231                                #second_ident -> [1]#chain_ident;
2232                            },
2233                            None,
2234                            Some(&next_stmt_id.to_string()),
2235                        );
2236                    }
2237                    BuildersOrCallback::Callback(_, node_callback) => {
2238                        node_callback(self, next_stmt_id);
2239                    }
2240                }
2241
2242                *next_stmt_id += 1;
2243
2244                chain_ident
2245            }
2246
2247            HydroNode::CrossSingleton { left, right, .. } => {
2248                let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2249                let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2250
2251                let cross_ident =
2252                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2253
2254                match builders_or_callback {
2255                    BuildersOrCallback::Builders(graph_builders) => {
2256                        let builder = graph_builders.get_dfir_mut(&out_location);
2257                        builder.add_dfir(
2258                            parse_quote! {
2259                                #cross_ident = cross_singleton();
2260                                #left_ident -> [input]#cross_ident;
2261                                #right_ident -> [single]#cross_ident;
2262                            },
2263                            None,
2264                            Some(&next_stmt_id.to_string()),
2265                        );
2266                    }
2267                    BuildersOrCallback::Callback(_, node_callback) => {
2268                        node_callback(self, next_stmt_id);
2269                    }
2270                }
2271
2272                *next_stmt_id += 1;
2273
2274                cross_ident
2275            }
2276
2277            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2278                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2279                    parse_quote!(cross_join_multiset)
2280                } else {
2281                    parse_quote!(join_multiset)
2282                };
2283
2284                let (HydroNode::CrossProduct { left, right, .. }
2285                | HydroNode::Join { left, right, .. }) = self
2286                else {
2287                    unreachable!()
2288                };
2289
2290                let is_top_level = left.metadata().location_kind.is_top_level()
2291                    && right.metadata().location_kind.is_top_level();
2292                let (left_inner, left_lifetime) =
2293                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2294                        debug_assert!(!left.metadata().location_kind.is_top_level());
2295                        (left, quote!('static))
2296                    } else if left.metadata().location_kind.is_top_level() {
2297                        (left, quote!('static))
2298                    } else {
2299                        (left, quote!('tick))
2300                    };
2301
2302                let (right_inner, right_lifetime) =
2303                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2304                        debug_assert!(!right.metadata().location_kind.is_top_level());
2305                        (right, quote!('static))
2306                    } else if right.metadata().location_kind.is_top_level() {
2307                        (right, quote!('static))
2308                    } else {
2309                        (right, quote!('tick))
2310                    };
2311
2312                let left_ident =
2313                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2314                let right_ident =
2315                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2316
2317                let stream_ident =
2318                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2319
2320                match builders_or_callback {
2321                    BuildersOrCallback::Builders(graph_builders) => {
2322                        let builder = graph_builders.get_dfir_mut(&out_location);
2323                        builder.add_dfir(
2324                            if is_top_level {
2325                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2326                                // a multiset_delta() to negate the replay behavior
2327                                parse_quote! {
2328                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2329                                    #left_ident -> [0]#stream_ident;
2330                                    #right_ident -> [1]#stream_ident;
2331                                }
2332                            } else {
2333                                parse_quote! {
2334                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2335                                    #left_ident -> [0]#stream_ident;
2336                                    #right_ident -> [1]#stream_ident;
2337                                }
2338                            }
2339                            ,
2340                            None,
2341                            Some(&next_stmt_id.to_string()),
2342                        );
2343                    }
2344                    BuildersOrCallback::Callback(_, node_callback) => {
2345                        node_callback(self, next_stmt_id);
2346                    }
2347                }
2348
2349                *next_stmt_id += 1;
2350
2351                stream_ident
2352            }
2353
2354            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2355                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2356                    parse_quote!(difference_multiset)
2357                } else {
2358                    parse_quote!(anti_join_multiset)
2359                };
2360
2361                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2362                    self
2363                else {
2364                    unreachable!()
2365                };
2366
2367                let (neg, neg_lifetime) =
2368                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2369                        debug_assert!(!neg.metadata().location_kind.is_top_level());
2370                        (neg, quote!('static))
2371                    } else if neg.metadata().location_kind.is_top_level() {
2372                        (neg, quote!('static))
2373                    } else {
2374                        (neg, quote!('tick))
2375                    };
2376
2377                let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2378                let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2379
2380                let stream_ident =
2381                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2382
2383                match builders_or_callback {
2384                    BuildersOrCallback::Builders(graph_builders) => {
2385                        let builder = graph_builders.get_dfir_mut(&out_location);
2386                        builder.add_dfir(
2387                            parse_quote! {
2388                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2389                                #pos_ident -> [pos]#stream_ident;
2390                                #neg_ident -> [neg]#stream_ident;
2391                            },
2392                            None,
2393                            Some(&next_stmt_id.to_string()),
2394                        );
2395                    }
2396                    BuildersOrCallback::Callback(_, node_callback) => {
2397                        node_callback(self, next_stmt_id);
2398                    }
2399                }
2400
2401                *next_stmt_id += 1;
2402
2403                stream_ident
2404            }
2405
2406            HydroNode::ResolveFutures { input, .. } => {
2407                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2408
2409                let futures_ident =
2410                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2411
2412                match builders_or_callback {
2413                    BuildersOrCallback::Builders(graph_builders) => {
2414                        let builder = graph_builders.get_dfir_mut(&out_location);
2415                        builder.add_dfir(
2416                            parse_quote! {
2417                                #futures_ident = #input_ident -> resolve_futures();
2418                            },
2419                            None,
2420                            Some(&next_stmt_id.to_string()),
2421                        );
2422                    }
2423                    BuildersOrCallback::Callback(_, node_callback) => {
2424                        node_callback(self, next_stmt_id);
2425                    }
2426                }
2427
2428                *next_stmt_id += 1;
2429
2430                futures_ident
2431            }
2432
2433            HydroNode::ResolveFuturesOrdered { input, .. } => {
2434                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2435
2436                let futures_ident =
2437                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2438
2439                match builders_or_callback {
2440                    BuildersOrCallback::Builders(graph_builders) => {
2441                        let builder = graph_builders.get_dfir_mut(&out_location);
2442                        builder.add_dfir(
2443                            parse_quote! {
2444                                #futures_ident = #input_ident -> resolve_futures_ordered();
2445                            },
2446                            None,
2447                            Some(&next_stmt_id.to_string()),
2448                        );
2449                    }
2450                    BuildersOrCallback::Callback(_, node_callback) => {
2451                        node_callback(self, next_stmt_id);
2452                    }
2453                }
2454
2455                *next_stmt_id += 1;
2456
2457                futures_ident
2458            }
2459
2460            HydroNode::Map { f, input, .. } => {
2461                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2462
2463                let map_ident =
2464                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2465
2466                match builders_or_callback {
2467                    BuildersOrCallback::Builders(graph_builders) => {
2468                        let builder = graph_builders.get_dfir_mut(&out_location);
2469                        builder.add_dfir(
2470                            parse_quote! {
2471                                #map_ident = #input_ident -> map(#f);
2472                            },
2473                            None,
2474                            Some(&next_stmt_id.to_string()),
2475                        );
2476                    }
2477                    BuildersOrCallback::Callback(_, node_callback) => {
2478                        node_callback(self, next_stmt_id);
2479                    }
2480                }
2481
2482                *next_stmt_id += 1;
2483
2484                map_ident
2485            }
2486
2487            HydroNode::FlatMap { f, input, .. } => {
2488                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2489
2490                let flat_map_ident =
2491                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2492
2493                match builders_or_callback {
2494                    BuildersOrCallback::Builders(graph_builders) => {
2495                        let builder = graph_builders.get_dfir_mut(&out_location);
2496                        builder.add_dfir(
2497                            parse_quote! {
2498                                #flat_map_ident = #input_ident -> flat_map(#f);
2499                            },
2500                            None,
2501                            Some(&next_stmt_id.to_string()),
2502                        );
2503                    }
2504                    BuildersOrCallback::Callback(_, node_callback) => {
2505                        node_callback(self, next_stmt_id);
2506                    }
2507                }
2508
2509                *next_stmt_id += 1;
2510
2511                flat_map_ident
2512            }
2513
2514            HydroNode::Filter { f, input, .. } => {
2515                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2516
2517                let filter_ident =
2518                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2519
2520                match builders_or_callback {
2521                    BuildersOrCallback::Builders(graph_builders) => {
2522                        let builder = graph_builders.get_dfir_mut(&out_location);
2523                        builder.add_dfir(
2524                            parse_quote! {
2525                                #filter_ident = #input_ident -> filter(#f);
2526                            },
2527                            None,
2528                            Some(&next_stmt_id.to_string()),
2529                        );
2530                    }
2531                    BuildersOrCallback::Callback(_, node_callback) => {
2532                        node_callback(self, next_stmt_id);
2533                    }
2534                }
2535
2536                *next_stmt_id += 1;
2537
2538                filter_ident
2539            }
2540
2541            HydroNode::FilterMap { f, input, .. } => {
2542                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2543
2544                let filter_map_ident =
2545                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2546
2547                match builders_or_callback {
2548                    BuildersOrCallback::Builders(graph_builders) => {
2549                        let builder = graph_builders.get_dfir_mut(&out_location);
2550                        builder.add_dfir(
2551                            parse_quote! {
2552                                #filter_map_ident = #input_ident -> filter_map(#f);
2553                            },
2554                            None,
2555                            Some(&next_stmt_id.to_string()),
2556                        );
2557                    }
2558                    BuildersOrCallback::Callback(_, node_callback) => {
2559                        node_callback(self, next_stmt_id);
2560                    }
2561                }
2562
2563                *next_stmt_id += 1;
2564
2565                filter_map_ident
2566            }
2567
2568            HydroNode::Sort { input, .. } => {
2569                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2570
2571                let sort_ident =
2572                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2573
2574                match builders_or_callback {
2575                    BuildersOrCallback::Builders(graph_builders) => {
2576                        let builder = graph_builders.get_dfir_mut(&out_location);
2577                        builder.add_dfir(
2578                            parse_quote! {
2579                                #sort_ident = #input_ident -> sort();
2580                            },
2581                            None,
2582                            Some(&next_stmt_id.to_string()),
2583                        );
2584                    }
2585                    BuildersOrCallback::Callback(_, node_callback) => {
2586                        node_callback(self, next_stmt_id);
2587                    }
2588                }
2589
2590                *next_stmt_id += 1;
2591
2592                sort_ident
2593            }
2594
2595            HydroNode::DeferTick { input, .. } => {
2596                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2597
2598                let defer_tick_ident =
2599                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601                match builders_or_callback {
2602                    BuildersOrCallback::Builders(graph_builders) => {
2603                        let builder = graph_builders.get_dfir_mut(&out_location);
2604                        builder.add_dfir(
2605                            parse_quote! {
2606                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2607                            },
2608                            None,
2609                            Some(&next_stmt_id.to_string()),
2610                        );
2611                    }
2612                    BuildersOrCallback::Callback(_, node_callback) => {
2613                        node_callback(self, next_stmt_id);
2614                    }
2615                }
2616
2617                *next_stmt_id += 1;
2618
2619                defer_tick_ident
2620            }
2621
2622            HydroNode::Enumerate { input, .. } => {
2623                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2624
2625                let enumerate_ident =
2626                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2627
2628                match builders_or_callback {
2629                    BuildersOrCallback::Builders(graph_builders) => {
2630                        let builder = graph_builders.get_dfir_mut(&out_location);
2631                        let lifetime = if input.metadata().location_kind.is_top_level() {
2632                            quote!('static)
2633                        } else {
2634                            quote!('tick)
2635                        };
2636                        builder.add_dfir(
2637                            parse_quote! {
2638                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2639                            },
2640                            None,
2641                            Some(&next_stmt_id.to_string()),
2642                        );
2643                    }
2644                    BuildersOrCallback::Callback(_, node_callback) => {
2645                        node_callback(self, next_stmt_id);
2646                    }
2647                }
2648
2649                *next_stmt_id += 1;
2650
2651                enumerate_ident
2652            }
2653
2654            HydroNode::Inspect { f, input, .. } => {
2655                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2656
2657                let inspect_ident =
2658                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2659
2660                match builders_or_callback {
2661                    BuildersOrCallback::Builders(graph_builders) => {
2662                        let builder = graph_builders.get_dfir_mut(&out_location);
2663                        builder.add_dfir(
2664                            parse_quote! {
2665                                #inspect_ident = #input_ident -> inspect(#f);
2666                            },
2667                            None,
2668                            Some(&next_stmt_id.to_string()),
2669                        );
2670                    }
2671                    BuildersOrCallback::Callback(_, node_callback) => {
2672                        node_callback(self, next_stmt_id);
2673                    }
2674                }
2675
2676                *next_stmt_id += 1;
2677
2678                inspect_ident
2679            }
2680
2681            HydroNode::Unique { input, .. } => {
2682                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2683
2684                let unique_ident =
2685                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2686
2687                match builders_or_callback {
2688                    BuildersOrCallback::Builders(graph_builders) => {
2689                        let builder = graph_builders.get_dfir_mut(&out_location);
2690                        let lifetime = if input.metadata().location_kind.is_top_level() {
2691                            quote!('static)
2692                        } else {
2693                            quote!('tick)
2694                        };
2695
2696                        builder.add_dfir(
2697                            parse_quote! {
2698                                #unique_ident = #input_ident -> unique::<#lifetime>();
2699                            },
2700                            None,
2701                            Some(&next_stmt_id.to_string()),
2702                        );
2703                    }
2704                    BuildersOrCallback::Callback(_, node_callback) => {
2705                        node_callback(self, next_stmt_id);
2706                    }
2707                }
2708
2709                *next_stmt_id += 1;
2710
2711                unique_ident
2712            }
2713
2714            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2715                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2716                    parse_quote!(fold)
2717                } else if matches!(self, HydroNode::Scan { .. }) {
2718                    parse_quote!(scan)
2719                } else {
2720                    parse_quote!(fold_keyed)
2721                };
2722
2723                let (HydroNode::Fold {
2724                    init, acc, input, ..
2725                }
2726                | HydroNode::FoldKeyed {
2727                    init, acc, input, ..
2728                }
2729                | HydroNode::Scan {
2730                    init, acc, input, ..
2731                }) = self
2732                else {
2733                    unreachable!()
2734                };
2735
2736                let (input, lifetime) =
2737                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2738                        debug_assert!(!input.metadata().location_kind.is_top_level());
2739                        (input, quote!('static))
2740                    } else if input.metadata().location_kind.is_top_level() {
2741                        (input, quote!('static))
2742                    } else {
2743                        (input, quote!('tick))
2744                    };
2745
2746                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2747
2748                let fold_ident =
2749                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2750
2751                match builders_or_callback {
2752                    BuildersOrCallback::Builders(graph_builders) => {
2753                        let builder = graph_builders.get_dfir_mut(&out_location);
2754                        builder.add_dfir(
2755                            parse_quote! {
2756                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
2757                            },
2758                            None,
2759                            Some(&next_stmt_id.to_string()),
2760                        );
2761                    }
2762                    BuildersOrCallback::Callback(_, node_callback) => {
2763                        node_callback(self, next_stmt_id);
2764                    }
2765                }
2766
2767                *next_stmt_id += 1;
2768
2769                fold_ident
2770            }
2771
2772            HydroNode::ReduceKeyedWatermark {
2773                f,
2774                input,
2775                watermark,
2776                ..
2777            } => {
2778                let (input, lifetime) =
2779                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2780                        debug_assert!(!input.metadata().location_kind.is_top_level());
2781                        (input, quote!('static))
2782                    } else if input.metadata().location_kind.is_top_level() {
2783                        (input, quote!('static))
2784                    } else {
2785                        (input, quote!('tick))
2786                    };
2787
2788                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2789
2790                let watermark_ident =
2791                    watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
2792
2793                let chain_ident = syn::Ident::new(
2794                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
2795                    Span::call_site(),
2796                );
2797
2798                let fold_ident =
2799                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2800
2801                match builders_or_callback {
2802                    BuildersOrCallback::Builders(graph_builders) => {
2803                        let builder = graph_builders.get_dfir_mut(&out_location);
2804                        // 1. Don't allow any values to be added to the map if the key <=the watermark
2805                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
2806                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
2807                        // 3. Convert the BTreeMap back into a stream of (k, v)
2808                        builder.add_dfir(
2809                            parse_quote! {
2810                                #chain_ident = chain();
2811                                #input_ident
2812                                    -> map(|x| (Some(x), None))
2813                                    -> [0]#chain_ident;
2814                                #watermark_ident
2815                                    -> map(|watermark| (None, Some(watermark)))
2816                                    -> [1]#chain_ident;
2817
2818                                #fold_ident = #chain_ident
2819                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
2820                                        let __reduce_keyed_fn = #f;
2821                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
2822                                            if let Some((k, v)) = opt_payload {
2823                                                if let Some(curr_watermark) = *opt_curr_watermark {
2824                                                    if k <= curr_watermark {
2825                                                        return;
2826                                                    }
2827                                                }
2828                                                match map.entry(k) {
2829                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
2830                                                        e.insert(v);
2831                                                    }
2832                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
2833                                                        __reduce_keyed_fn(e.get_mut(), v);
2834                                                    }
2835                                                }
2836                                            } else {
2837                                                let watermark = opt_watermark.unwrap();
2838                                                if let Some(curr_watermark) = *opt_curr_watermark {
2839                                                    if watermark <= curr_watermark {
2840                                                        return;
2841                                                    }
2842                                                }
2843                                                *opt_curr_watermark = opt_watermark;
2844                                                map.retain(|k, _| *k > watermark);
2845                                            }
2846                                        }
2847                                    })
2848                                    -> flat_map(|(map, _curr_watermark)| map);
2849                            },
2850                            None,
2851                            Some(&next_stmt_id.to_string()),
2852                        );
2853                    }
2854                    BuildersOrCallback::Callback(_, node_callback) => {
2855                        node_callback(self, next_stmt_id);
2856                    }
2857                }
2858
2859                *next_stmt_id += 1;
2860
2861                fold_ident
2862            }
2863
2864            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
2865                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
2866                    parse_quote!(reduce)
2867                } else {
2868                    parse_quote!(reduce_keyed)
2869                };
2870
2871                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
2872                    self
2873                else {
2874                    unreachable!()
2875                };
2876
2877                let (input, lifetime) =
2878                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2879                        debug_assert!(!input.metadata().location_kind.is_top_level());
2880                        (input, quote!('static))
2881                    } else if input.metadata().location_kind.is_top_level() {
2882                        (input, quote!('static))
2883                    } else {
2884                        (input, quote!('tick))
2885                    };
2886
2887                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2888
2889                let reduce_ident =
2890                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2891
2892                match builders_or_callback {
2893                    BuildersOrCallback::Builders(graph_builders) => {
2894                        let builder = graph_builders.get_dfir_mut(&out_location);
2895                        builder.add_dfir(
2896                            parse_quote! {
2897                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
2898                            },
2899                            None,
2900                            Some(&next_stmt_id.to_string()),
2901                        );
2902                    }
2903                    BuildersOrCallback::Callback(_, node_callback) => {
2904                        node_callback(self, next_stmt_id);
2905                    }
2906                }
2907
2908                *next_stmt_id += 1;
2909
2910                reduce_ident
2911            }
2912
2913            HydroNode::Network {
2914                serialize_fn: serialize_pipeline,
2915                instantiate_fn,
2916                deserialize_fn: deserialize_pipeline,
2917                input,
2918                ..
2919            } => {
2920                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2921
2922                let receiver_stream_ident =
2923                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2924
2925                match builders_or_callback {
2926                    BuildersOrCallback::Builders(graph_builders) => {
2927                        let (sink_expr, source_expr) = match instantiate_fn {
2928                            DebugInstantiate::Building => (
2929                                syn::parse_quote!(DUMMY_SINK),
2930                                syn::parse_quote!(DUMMY_SOURCE),
2931                            ),
2932
2933                            DebugInstantiate::Finalized(finalized) => {
2934                                (finalized.sink.clone(), finalized.source.clone())
2935                            }
2936                        };
2937
2938                        graph_builders.create_network(
2939                            &input.metadata().location_kind,
2940                            &out_location,
2941                            input_ident,
2942                            &receiver_stream_ident,
2943                            serialize_pipeline,
2944                            sink_expr,
2945                            source_expr,
2946                            deserialize_pipeline,
2947                            *next_stmt_id,
2948                        );
2949                    }
2950                    BuildersOrCallback::Callback(_, node_callback) => {
2951                        node_callback(self, next_stmt_id);
2952                    }
2953                }
2954
2955                *next_stmt_id += 1;
2956
2957                receiver_stream_ident
2958            }
2959
2960            HydroNode::ExternalInput {
2961                instantiate_fn,
2962                deserialize_fn: deserialize_pipeline,
2963                ..
2964            } => {
2965                let receiver_stream_ident =
2966                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2967
2968                match builders_or_callback {
2969                    BuildersOrCallback::Builders(graph_builders) => {
2970                        let (_, source_expr) = match instantiate_fn {
2971                            DebugInstantiate::Building => (
2972                                syn::parse_quote!(DUMMY_SINK),
2973                                syn::parse_quote!(DUMMY_SOURCE),
2974                            ),
2975
2976                            DebugInstantiate::Finalized(finalized) => {
2977                                (finalized.sink.clone(), finalized.source.clone())
2978                            }
2979                        };
2980
2981                        graph_builders.create_external_source(
2982                            &out_location,
2983                            source_expr,
2984                            &receiver_stream_ident,
2985                            deserialize_pipeline,
2986                            *next_stmt_id,
2987                        );
2988                    }
2989                    BuildersOrCallback::Callback(_, node_callback) => {
2990                        node_callback(self, next_stmt_id);
2991                    }
2992                }
2993
2994                *next_stmt_id += 1;
2995
2996                receiver_stream_ident
2997            }
2998
2999            HydroNode::Counter {
3000                tag,
3001                duration,
3002                prefix,
3003                input,
3004                ..
3005            } => {
3006                let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3007
3008                let counter_ident =
3009                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3010
3011                match builders_or_callback {
3012                    BuildersOrCallback::Builders(graph_builders) => {
3013                        let builder = graph_builders.get_dfir_mut(&out_location);
3014                        builder.add_dfir(
3015                            parse_quote! {
3016                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3017                            },
3018                            None,
3019                            Some(&next_stmt_id.to_string()),
3020                        );
3021                    }
3022                    BuildersOrCallback::Callback(_, node_callback) => {
3023                        node_callback(self, next_stmt_id);
3024                    }
3025                }
3026
3027                *next_stmt_id += 1;
3028
3029                counter_ident
3030            }
3031        }
3032    }
3033
3034    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3035        match self {
3036            HydroNode::Placeholder => {
3037                panic!()
3038            }
3039            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3040            HydroNode::Source { source, .. } => match source {
3041                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3042                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3043            },
3044            HydroNode::CycleSource { .. }
3045            | HydroNode::Tee { .. }
3046            | HydroNode::Persist { .. }
3047            | HydroNode::YieldConcat { .. }
3048            | HydroNode::BeginAtomic { .. }
3049            | HydroNode::EndAtomic { .. }
3050            | HydroNode::Batch { .. }
3051            | HydroNode::Chain { .. }
3052            | HydroNode::ChainFirst { .. }
3053            | HydroNode::CrossProduct { .. }
3054            | HydroNode::CrossSingleton { .. }
3055            | HydroNode::ResolveFutures { .. }
3056            | HydroNode::ResolveFuturesOrdered { .. }
3057            | HydroNode::Join { .. }
3058            | HydroNode::Difference { .. }
3059            | HydroNode::AntiJoin { .. }
3060            | HydroNode::DeferTick { .. }
3061            | HydroNode::Enumerate { .. }
3062            | HydroNode::Unique { .. }
3063            | HydroNode::Sort { .. } => {}
3064            HydroNode::Map { f, .. }
3065            | HydroNode::FlatMap { f, .. }
3066            | HydroNode::Filter { f, .. }
3067            | HydroNode::FilterMap { f, .. }
3068            | HydroNode::Inspect { f, .. }
3069            | HydroNode::Reduce { f, .. }
3070            | HydroNode::ReduceKeyed { f, .. }
3071            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3072                transform(f);
3073            }
3074            HydroNode::Fold { init, acc, .. }
3075            | HydroNode::Scan { init, acc, .. }
3076            | HydroNode::FoldKeyed { init, acc, .. } => {
3077                transform(init);
3078                transform(acc);
3079            }
3080            HydroNode::Network {
3081                serialize_fn,
3082                deserialize_fn,
3083                ..
3084            } => {
3085                if let Some(serialize_fn) = serialize_fn {
3086                    transform(serialize_fn);
3087                }
3088                if let Some(deserialize_fn) = deserialize_fn {
3089                    transform(deserialize_fn);
3090                }
3091            }
3092            HydroNode::ExternalInput { deserialize_fn, .. } => {
3093                if let Some(deserialize_fn) = deserialize_fn {
3094                    transform(deserialize_fn);
3095                }
3096            }
3097            HydroNode::Counter { duration, .. } => {
3098                transform(duration);
3099            }
3100        }
3101    }
3102
3103    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3104        &self.metadata().op
3105    }
3106
3107    pub fn metadata(&self) -> &HydroIrMetadata {
3108        match self {
3109            HydroNode::Placeholder => {
3110                panic!()
3111            }
3112            HydroNode::Cast { metadata, .. } => metadata,
3113            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3114            HydroNode::Source { metadata, .. } => metadata,
3115            HydroNode::CycleSource { metadata, .. } => metadata,
3116            HydroNode::Tee { metadata, .. } => metadata,
3117            HydroNode::Persist { metadata, .. } => metadata,
3118            HydroNode::YieldConcat { metadata, .. } => metadata,
3119            HydroNode::BeginAtomic { metadata, .. } => metadata,
3120            HydroNode::EndAtomic { metadata, .. } => metadata,
3121            HydroNode::Batch { metadata, .. } => metadata,
3122            HydroNode::Chain { metadata, .. } => metadata,
3123            HydroNode::ChainFirst { metadata, .. } => metadata,
3124            HydroNode::CrossProduct { metadata, .. } => metadata,
3125            HydroNode::CrossSingleton { metadata, .. } => metadata,
3126            HydroNode::Join { metadata, .. } => metadata,
3127            HydroNode::Difference { metadata, .. } => metadata,
3128            HydroNode::AntiJoin { metadata, .. } => metadata,
3129            HydroNode::ResolveFutures { metadata, .. } => metadata,
3130            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3131            HydroNode::Map { metadata, .. } => metadata,
3132            HydroNode::FlatMap { metadata, .. } => metadata,
3133            HydroNode::Filter { metadata, .. } => metadata,
3134            HydroNode::FilterMap { metadata, .. } => metadata,
3135            HydroNode::DeferTick { metadata, .. } => metadata,
3136            HydroNode::Enumerate { metadata, .. } => metadata,
3137            HydroNode::Inspect { metadata, .. } => metadata,
3138            HydroNode::Unique { metadata, .. } => metadata,
3139            HydroNode::Sort { metadata, .. } => metadata,
3140            HydroNode::Scan { metadata, .. } => metadata,
3141            HydroNode::Fold { metadata, .. } => metadata,
3142            HydroNode::FoldKeyed { metadata, .. } => metadata,
3143            HydroNode::Reduce { metadata, .. } => metadata,
3144            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3145            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3146            HydroNode::ExternalInput { metadata, .. } => metadata,
3147            HydroNode::Network { metadata, .. } => metadata,
3148            HydroNode::Counter { metadata, .. } => metadata,
3149        }
3150    }
3151
3152    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3153        &mut self.metadata_mut().op
3154    }
3155
3156    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3157        match self {
3158            HydroNode::Placeholder => {
3159                panic!()
3160            }
3161            HydroNode::Cast { metadata, .. } => metadata,
3162            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3163            HydroNode::Source { metadata, .. } => metadata,
3164            HydroNode::CycleSource { metadata, .. } => metadata,
3165            HydroNode::Tee { metadata, .. } => metadata,
3166            HydroNode::Persist { metadata, .. } => metadata,
3167            HydroNode::YieldConcat { metadata, .. } => metadata,
3168            HydroNode::BeginAtomic { metadata, .. } => metadata,
3169            HydroNode::EndAtomic { metadata, .. } => metadata,
3170            HydroNode::Batch { metadata, .. } => metadata,
3171            HydroNode::Chain { metadata, .. } => metadata,
3172            HydroNode::ChainFirst { metadata, .. } => metadata,
3173            HydroNode::CrossProduct { metadata, .. } => metadata,
3174            HydroNode::CrossSingleton { metadata, .. } => metadata,
3175            HydroNode::Join { metadata, .. } => metadata,
3176            HydroNode::Difference { metadata, .. } => metadata,
3177            HydroNode::AntiJoin { metadata, .. } => metadata,
3178            HydroNode::ResolveFutures { metadata, .. } => metadata,
3179            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3180            HydroNode::Map { metadata, .. } => metadata,
3181            HydroNode::FlatMap { metadata, .. } => metadata,
3182            HydroNode::Filter { metadata, .. } => metadata,
3183            HydroNode::FilterMap { metadata, .. } => metadata,
3184            HydroNode::DeferTick { metadata, .. } => metadata,
3185            HydroNode::Enumerate { metadata, .. } => metadata,
3186            HydroNode::Inspect { metadata, .. } => metadata,
3187            HydroNode::Unique { metadata, .. } => metadata,
3188            HydroNode::Sort { metadata, .. } => metadata,
3189            HydroNode::Scan { metadata, .. } => metadata,
3190            HydroNode::Fold { metadata, .. } => metadata,
3191            HydroNode::FoldKeyed { metadata, .. } => metadata,
3192            HydroNode::Reduce { metadata, .. } => metadata,
3193            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3194            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3195            HydroNode::ExternalInput { metadata, .. } => metadata,
3196            HydroNode::Network { metadata, .. } => metadata,
3197            HydroNode::Counter { metadata, .. } => metadata,
3198        }
3199    }
3200
3201    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3202        match self {
3203            HydroNode::Placeholder => {
3204                panic!()
3205            }
3206            HydroNode::Source { .. }
3207            | HydroNode::ExternalInput { .. }
3208            | HydroNode::CycleSource { .. } // CycleSource and Tee should calculate input metadata in separate special ways
3209            | HydroNode::Tee { .. } => {
3210                vec![]
3211            }
3212            HydroNode::Cast { inner, .. }
3213            | HydroNode::ObserveNonDet { inner, .. }
3214            | HydroNode::Persist { inner, .. }
3215            | HydroNode::YieldConcat { inner, .. }
3216            | HydroNode::BeginAtomic { inner, .. }
3217            | HydroNode::EndAtomic { inner, .. }
3218            | HydroNode::Batch { inner, .. } => {
3219                vec![inner.metadata()]
3220            }
3221            HydroNode::Chain { first, second, .. } => {
3222                vec![first.metadata(), second.metadata()]
3223            }
3224            HydroNode::ChainFirst { first, second, .. } => {
3225                vec![first.metadata(), second.metadata()]
3226            }
3227            HydroNode::CrossProduct { left, right, .. }
3228            | HydroNode::CrossSingleton { left, right, .. }
3229            | HydroNode::Join { left, right, .. } => {
3230                vec![left.metadata(), right.metadata()]
3231            }
3232            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3233                vec![pos.metadata(), neg.metadata()]
3234            }
3235            HydroNode::Map { input, .. }
3236            | HydroNode::FlatMap { input, .. }
3237            | HydroNode::Filter { input, .. }
3238            | HydroNode::FilterMap { input, .. }
3239            | HydroNode::Sort { input, .. }
3240            | HydroNode::DeferTick { input, .. }
3241            | HydroNode::Enumerate { input, .. }
3242            | HydroNode::Inspect { input, .. }
3243            | HydroNode::Unique { input, .. }
3244            | HydroNode::Network { input, .. }
3245            | HydroNode::Counter { input, .. }
3246            | HydroNode::ResolveFutures { input, .. }
3247            | HydroNode::ResolveFuturesOrdered { input, .. } => {
3248                vec![input.metadata()]
3249            }
3250            HydroNode::Fold { input, .. }
3251            | HydroNode::FoldKeyed { input, .. }
3252            | HydroNode::Reduce { input, .. }
3253            | HydroNode::ReduceKeyed { input, .. }
3254            | HydroNode::Scan { input, .. } => {
3255                // Skip persist before fold/reduce
3256                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3257                    vec![inner.metadata()]
3258                } else {
3259                    vec![input.metadata()]
3260                }
3261            }
3262            HydroNode::ReduceKeyedWatermark { input, watermark, .. } => {
3263                // Skip persist before fold/reduce
3264                if let HydroNode::Persist { inner, .. } = input.as_ref() {
3265                    vec![inner.metadata(), watermark.metadata()]
3266                } else {
3267                    vec![input.metadata(), watermark.metadata()]
3268                }
3269            }
3270        }
3271    }
3272
3273    pub fn print_root(&self) -> String {
3274        match self {
3275            HydroNode::Placeholder => {
3276                panic!()
3277            }
3278            HydroNode::Cast { .. } => "Cast()".to_string(),
3279            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3280            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3281            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3282            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3283            HydroNode::Persist { .. } => "Persist()".to_string(),
3284            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3285            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3286            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3287            HydroNode::Batch { .. } => "Batch()".to_string(),
3288            HydroNode::Chain { first, second, .. } => {
3289                format!("Chain({}, {})", first.print_root(), second.print_root())
3290            }
3291            HydroNode::ChainFirst { first, second, .. } => {
3292                format!(
3293                    "ChainFirst({}, {})",
3294                    first.print_root(),
3295                    second.print_root()
3296                )
3297            }
3298            HydroNode::CrossProduct { left, right, .. } => {
3299                format!(
3300                    "CrossProduct({}, {})",
3301                    left.print_root(),
3302                    right.print_root()
3303                )
3304            }
3305            HydroNode::CrossSingleton { left, right, .. } => {
3306                format!(
3307                    "CrossSingleton({}, {})",
3308                    left.print_root(),
3309                    right.print_root()
3310                )
3311            }
3312            HydroNode::Join { left, right, .. } => {
3313                format!("Join({}, {})", left.print_root(), right.print_root())
3314            }
3315            HydroNode::Difference { pos, neg, .. } => {
3316                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3317            }
3318            HydroNode::AntiJoin { pos, neg, .. } => {
3319                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3320            }
3321            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3322            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3323            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3324            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3325            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3326            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3327            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3328            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3329            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3330            HydroNode::Unique { .. } => "Unique()".to_string(),
3331            HydroNode::Sort { .. } => "Sort()".to_string(),
3332            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3333            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3334            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3335            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3336            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3337            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3338            HydroNode::Network { .. } => "Network()".to_string(),
3339            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3340            HydroNode::Counter { tag, duration, .. } => {
3341                format!("Counter({:?}, {:?})", tag, duration)
3342            }
3343        }
3344    }
3345}
3346
3347#[cfg(feature = "build")]
3348fn instantiate_network<'a, D>(
3349    from_location: &LocationId,
3350    to_location: &LocationId,
3351    processes: &HashMap<usize, D::Process>,
3352    clusters: &HashMap<usize, D::Cluster>,
3353    compile_env: &D::CompileEnv,
3354) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3355where
3356    D: Deploy<'a>,
3357{
3358    let ((sink, source), connect_fn) = match (from_location, to_location) {
3359        (LocationId::Process(from), LocationId::Process(to)) => {
3360            let from_node = processes
3361                .get(from)
3362                .unwrap_or_else(|| {
3363                    panic!("A process used in the graph was not instantiated: {}", from)
3364                })
3365                .clone();
3366            let to_node = processes
3367                .get(to)
3368                .unwrap_or_else(|| {
3369                    panic!("A process used in the graph was not instantiated: {}", to)
3370                })
3371                .clone();
3372
3373            let sink_port = D::allocate_process_port(&from_node);
3374            let source_port = D::allocate_process_port(&to_node);
3375
3376            (
3377                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3378                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3379            )
3380        }
3381        (LocationId::Process(from), LocationId::Cluster(to)) => {
3382            let from_node = processes
3383                .get(from)
3384                .unwrap_or_else(|| {
3385                    panic!("A process used in the graph was not instantiated: {}", from)
3386                })
3387                .clone();
3388            let to_node = clusters
3389                .get(to)
3390                .unwrap_or_else(|| {
3391                    panic!("A cluster used in the graph was not instantiated: {}", to)
3392                })
3393                .clone();
3394
3395            let sink_port = D::allocate_process_port(&from_node);
3396            let source_port = D::allocate_cluster_port(&to_node);
3397
3398            (
3399                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3400                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3401            )
3402        }
3403        (LocationId::Cluster(from), LocationId::Process(to)) => {
3404            let from_node = clusters
3405                .get(from)
3406                .unwrap_or_else(|| {
3407                    panic!("A cluster used in the graph was not instantiated: {}", from)
3408                })
3409                .clone();
3410            let to_node = processes
3411                .get(to)
3412                .unwrap_or_else(|| {
3413                    panic!("A process used in the graph was not instantiated: {}", to)
3414                })
3415                .clone();
3416
3417            let sink_port = D::allocate_cluster_port(&from_node);
3418            let source_port = D::allocate_process_port(&to_node);
3419
3420            (
3421                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3422                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3423            )
3424        }
3425        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3426            let from_node = clusters
3427                .get(from)
3428                .unwrap_or_else(|| {
3429                    panic!("A cluster used in the graph was not instantiated: {}", from)
3430                })
3431                .clone();
3432            let to_node = clusters
3433                .get(to)
3434                .unwrap_or_else(|| {
3435                    panic!("A cluster used in the graph was not instantiated: {}", to)
3436                })
3437                .clone();
3438
3439            let sink_port = D::allocate_cluster_port(&from_node);
3440            let source_port = D::allocate_cluster_port(&to_node);
3441
3442            (
3443                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3444                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3445            )
3446        }
3447        (LocationId::Tick(_, _), _) => panic!(),
3448        (_, LocationId::Tick(_, _)) => panic!(),
3449        (LocationId::Atomic(_), _) => panic!(),
3450        (_, LocationId::Atomic(_)) => panic!(),
3451    };
3452    (sink, source, connect_fn)
3453}
3454
3455#[cfg(test)]
3456mod test {
3457    use std::mem::size_of;
3458
3459    use stageleft::{QuotedWithContext, q};
3460
3461    use super::*;
3462
3463    #[test]
3464    fn hydro_node_size() {
3465        assert_eq!(size_of::<HydroNode>(), 264);
3466    }
3467
3468    #[test]
3469    fn hydro_root_size() {
3470        assert_eq!(size_of::<HydroRoot>(), 160);
3471    }
3472
3473    #[test]
3474    fn test_simplify_q_macro_basic() {
3475        // Test basic non-q! expression
3476        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3477        let result = simplify_q_macro(simple_expr.clone());
3478        assert_eq!(result, simple_expr);
3479    }
3480
3481    #[test]
3482    fn test_simplify_q_macro_actual_stageleft_call() {
3483        // Test a simplified version of what a real stageleft call might look like
3484        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3485        let result = simplify_q_macro(stageleft_call);
3486        // This should be processed by our visitor and simplified to q!(...)
3487        // since we detect the stageleft::runtime_support::fn_* pattern
3488        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3489    }
3490
3491    #[test]
3492    fn test_closure_no_pipe_at_start() {
3493        // Test a closure that does not start with a pipe
3494        let stageleft_call = q!({
3495            let foo = 123;
3496            move |b: usize| b + foo
3497        })
3498        .splice_fn1_ctx(&());
3499        let result = simplify_q_macro(stageleft_call);
3500        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3501    }
3502}