hydro_lang/
ir.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26/// Debug displays the type's tokens.
27///
28/// Boxes `syn::Type` which is ~240 bytes.
29#[derive(Clone, Hash)]
30pub struct DebugExpr(pub Box<syn::Expr>);
31
32impl From<syn::Expr> for DebugExpr {
33    fn from(expr: syn::Expr) -> Self {
34        Self(Box::new(expr))
35    }
36}
37
38impl Deref for DebugExpr {
39    type Target = syn::Expr;
40
41    fn deref(&self) -> &Self::Target {
42        &self.0
43    }
44}
45
46impl ToTokens for DebugExpr {
47    fn to_tokens(&self, tokens: &mut TokenStream) {
48        self.0.to_tokens(tokens);
49    }
50}
51
52impl Debug for DebugExpr {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "{}", self.0.to_token_stream())
55    }
56}
57
58/// Debug displays the type's tokens.
59///
60/// Boxes `syn::Type` which is ~320 bytes.
61#[derive(Clone, Hash)]
62pub struct DebugType(pub Box<syn::Type>);
63
64impl From<syn::Type> for DebugType {
65    fn from(t: syn::Type) -> Self {
66        Self(Box::new(t))
67    }
68}
69
70impl Deref for DebugType {
71    type Target = syn::Type;
72
73    fn deref(&self) -> &Self::Target {
74        &self.0
75    }
76}
77
78impl ToTokens for DebugType {
79    fn to_tokens(&self, tokens: &mut TokenStream) {
80        self.0.to_tokens(tokens);
81    }
82}
83
84impl Debug for DebugType {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "{}", self.0.to_token_stream())
87    }
88}
89
90pub enum DebugInstantiate {
91    Building,
92    Finalized(Box<DebugInstantiateFinalized>),
93}
94
95#[cfg_attr(
96    not(feature = "build"),
97    expect(
98        dead_code,
99        reason = "sink, source unused without `feature = \"build\"`."
100    )
101)]
102pub struct DebugInstantiateFinalized {
103    sink: syn::Expr,
104    source: syn::Expr,
105    connect_fn: Option<Box<dyn FnOnce()>>,
106}
107
108impl From<DebugInstantiateFinalized> for DebugInstantiate {
109    fn from(f: DebugInstantiateFinalized) -> Self {
110        Self::Finalized(Box::new(f))
111    }
112}
113
114impl Debug for DebugInstantiate {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "<network instantiate>")
117    }
118}
119
120impl Hash for DebugInstantiate {
121    fn hash<H: Hasher>(&self, _state: &mut H) {
122        // Do nothing
123    }
124}
125
126impl Clone for DebugInstantiate {
127    fn clone(&self) -> Self {
128        match self {
129            DebugInstantiate::Building => DebugInstantiate::Building,
130            DebugInstantiate::Finalized(_) => {
131                panic!("DebugInstantiate::Finalized should not be cloned")
132            }
133        }
134    }
135}
136
137/// A source in a Hydro graph, where data enters the graph.
138#[derive(Debug, Hash, Clone)]
139pub enum HydroSource {
140    Stream(DebugExpr),
141    ExternalNetwork(),
142    Iter(DebugExpr),
143    Spin(),
144}
145
146#[cfg(feature = "build")]
147pub enum BuildersOrCallback<'a, L, N>
148where
149    L: FnMut(&mut HydroLeaf, &mut usize),
150    N: FnMut(&mut HydroNode, &mut usize),
151{
152    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
153    Callback(L, N),
154}
155
156/// An leaf in a Hydro graph, which is an pipeline that doesn't emit
157/// any downstream values. Traversals over the dataflow graph and
158/// generating DFIR IR start from leaves.
159#[derive(Debug, Hash)]
160pub enum HydroLeaf {
161    ForEach {
162        f: DebugExpr,
163        input: Box<HydroNode>,
164        metadata: HydroIrMetadata,
165    },
166    DestSink {
167        sink: DebugExpr,
168        input: Box<HydroNode>,
169        metadata: HydroIrMetadata,
170    },
171    CycleSink {
172        ident: syn::Ident,
173        location_kind: LocationId,
174        input: Box<HydroNode>,
175        metadata: HydroIrMetadata,
176    },
177}
178
179impl HydroLeaf {
180    #[cfg(feature = "build")]
181    pub fn compile_network<'a, D>(
182        &mut self,
183        compile_env: &D::CompileEnv,
184        seen_tees: &mut SeenTees,
185        seen_tee_locations: &mut SeenTeeLocations,
186        processes: &HashMap<usize, D::Process>,
187        clusters: &HashMap<usize, D::Cluster>,
188        externals: &HashMap<usize, D::ExternalProcess>,
189    ) where
190        D: Deploy<'a>,
191    {
192        self.transform_children(
193            |n, s| {
194                n.compile_network::<D>(
195                    compile_env,
196                    s,
197                    seen_tee_locations,
198                    processes,
199                    clusters,
200                    externals,
201                );
202            },
203            seen_tees,
204        )
205    }
206
207    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
208        self.transform_children(
209            |n, s| {
210                n.connect_network(s);
211            },
212            seen_tees,
213        )
214    }
215
216    pub fn transform_bottom_up(
217        &mut self,
218        transform_leaf: &mut impl FnMut(&mut HydroLeaf),
219        transform_node: &mut impl FnMut(&mut HydroNode),
220        seen_tees: &mut SeenTees,
221    ) {
222        self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
223
224        transform_leaf(self);
225    }
226
227    pub fn transform_children(
228        &mut self,
229        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
230        seen_tees: &mut SeenTees,
231    ) {
232        match self {
233            HydroLeaf::ForEach { f: _, input, .. }
234            | HydroLeaf::DestSink { sink: _, input, .. }
235            | HydroLeaf::CycleSink {
236                ident: _,
237                location_kind: _,
238                input,
239                ..
240            } => {
241                transform(input, seen_tees);
242            }
243        }
244    }
245
246    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
247        match self {
248            HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
249                f: f.clone(),
250                input: Box::new(input.deep_clone(seen_tees)),
251                metadata: metadata.clone(),
252            },
253            HydroLeaf::DestSink {
254                sink,
255                input,
256                metadata,
257            } => HydroLeaf::DestSink {
258                sink: sink.clone(),
259                input: Box::new(input.deep_clone(seen_tees)),
260                metadata: metadata.clone(),
261            },
262            HydroLeaf::CycleSink {
263                ident,
264                location_kind,
265                input,
266                metadata,
267            } => HydroLeaf::CycleSink {
268                ident: ident.clone(),
269                location_kind: location_kind.clone(),
270                input: Box::new(input.deep_clone(seen_tees)),
271                metadata: metadata.clone(),
272            },
273        }
274    }
275
276    #[cfg(feature = "build")]
277    pub fn emit(
278        &mut self,
279        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
280        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
281        next_stmt_id: &mut usize,
282    ) {
283        self.emit_core(
284            &mut BuildersOrCallback::Builders::<
285                fn(&mut HydroLeaf, &mut usize),
286                fn(&mut HydroNode, &mut usize),
287            >(graph_builders),
288            built_tees,
289            next_stmt_id,
290        );
291    }
292
293    #[cfg(feature = "build")]
294    pub fn emit_core(
295        &mut self,
296        builders_or_callback: &mut BuildersOrCallback<
297            impl FnMut(&mut HydroLeaf, &mut usize),
298            impl FnMut(&mut HydroNode, &mut usize),
299        >,
300        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
301        next_stmt_id: &mut usize,
302    ) {
303        match self {
304            HydroLeaf::ForEach { f, input, .. } => {
305                let (input_ident, input_location_id) =
306                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
307
308                match builders_or_callback {
309                    BuildersOrCallback::Builders(graph_builders) => {
310                        graph_builders
311                            .entry(input_location_id)
312                            .or_default()
313                            .add_dfir(
314                                parse_quote! {
315                                    #input_ident -> for_each(#f);
316                                },
317                                None,
318                                Some(&next_stmt_id.to_string()),
319                            );
320                    }
321                    BuildersOrCallback::Callback(leaf_callback, _) => {
322                        leaf_callback(self, next_stmt_id);
323                    }
324                }
325
326                *next_stmt_id += 1;
327            }
328
329            HydroLeaf::DestSink { sink, input, .. } => {
330                let (input_ident, input_location_id) =
331                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
332
333                match builders_or_callback {
334                    BuildersOrCallback::Builders(graph_builders) => {
335                        graph_builders
336                            .entry(input_location_id)
337                            .or_default()
338                            .add_dfir(
339                                parse_quote! {
340                                    #input_ident -> dest_sink(#sink);
341                                },
342                                None,
343                                Some(&next_stmt_id.to_string()),
344                            );
345                    }
346                    BuildersOrCallback::Callback(leaf_callback, _) => {
347                        leaf_callback(self, next_stmt_id);
348                    }
349                }
350
351                *next_stmt_id += 1;
352            }
353
354            HydroLeaf::CycleSink {
355                ident,
356                location_kind,
357                input,
358                ..
359            } => {
360                let (input_ident, input_location_id) =
361                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
362
363                let location_id = match location_kind.root() {
364                    LocationId::Process(id) => id,
365                    LocationId::Cluster(id) => id,
366                    LocationId::Tick(_, _) => panic!(),
367                    LocationId::ExternalProcess(_) => panic!(),
368                };
369
370                assert_eq!(
371                    input_location_id, *location_id,
372                    "cycle_sink location mismatch"
373                );
374
375                match builders_or_callback {
376                    BuildersOrCallback::Builders(graph_builders) => {
377                        graph_builders.entry(*location_id).or_default().add_dfir(
378                            parse_quote! {
379                                #ident = #input_ident;
380                            },
381                            None,
382                            None,
383                        );
384                    }
385                    // No ID, no callback
386                    BuildersOrCallback::Callback(_, _) => {}
387                }
388            }
389        }
390    }
391
392    pub fn metadata(&self) -> &HydroIrMetadata {
393        match self {
394            HydroLeaf::ForEach { metadata, .. }
395            | HydroLeaf::DestSink { metadata, .. }
396            | HydroLeaf::CycleSink { metadata, .. } => metadata,
397        }
398    }
399
400    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
401        match self {
402            HydroLeaf::ForEach { metadata, .. }
403            | HydroLeaf::DestSink { metadata, .. }
404            | HydroLeaf::CycleSink { metadata, .. } => metadata,
405        }
406    }
407
408    pub fn print_root(&self) -> String {
409        match self {
410            HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
411            HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
412            HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
413        }
414    }
415}
416
417#[cfg(feature = "build")]
418pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
419    let mut builders = BTreeMap::new();
420    let mut built_tees = HashMap::new();
421    let mut next_stmt_id = 0;
422    for leaf in ir {
423        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
424    }
425    builders
426}
427
428#[cfg(feature = "build")]
429pub fn traverse_dfir(
430    ir: &mut [HydroLeaf],
431    transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
432    transform_node: impl FnMut(&mut HydroNode, &mut usize),
433) {
434    let mut seen_tees = HashMap::new();
435    let mut next_stmt_id = 0;
436    let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
437    ir.iter_mut().for_each(|leaf| {
438        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
439    });
440}
441
442pub fn transform_bottom_up(
443    ir: &mut [HydroLeaf],
444    transform_leaf: &mut impl FnMut(&mut HydroLeaf),
445    transform_node: &mut impl FnMut(&mut HydroNode),
446) {
447    let mut seen_tees = HashMap::new();
448    ir.iter_mut().for_each(|leaf| {
449        leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
450    });
451}
452
453pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
454    let mut seen_tees = HashMap::new();
455    ir.iter()
456        .map(|leaf| leaf.deep_clone(&mut seen_tees))
457        .collect()
458}
459
460type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
461thread_local! {
462    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
463}
464
465pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
466    PRINTED_TEES.with(|printed_tees| {
467        let mut printed_tees_mut = printed_tees.borrow_mut();
468        *printed_tees_mut = Some((0, HashMap::new()));
469        drop(printed_tees_mut);
470
471        let ret = f();
472
473        let mut printed_tees_mut = printed_tees.borrow_mut();
474        *printed_tees_mut = None;
475
476        ret
477    })
478}
479
480pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
481
482impl TeeNode {
483    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
484        Rc::as_ptr(&self.0)
485    }
486}
487
488impl Debug for TeeNode {
489    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490        PRINTED_TEES.with(|printed_tees| {
491            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
492            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
493
494            if let Some(printed_tees_mut) = printed_tees_mut {
495                if let Some(existing) = printed_tees_mut
496                    .1
497                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
498                {
499                    write!(f, "<tee {}>", existing)
500                } else {
501                    let next_id = printed_tees_mut.0;
502                    printed_tees_mut.0 += 1;
503                    printed_tees_mut
504                        .1
505                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
506                    drop(printed_tees_mut_borrow);
507                    write!(f, "<tee {}>: ", next_id)?;
508                    Debug::fmt(&self.0.borrow(), f)
509                }
510            } else {
511                drop(printed_tees_mut_borrow);
512                write!(f, "<tee>: ")?;
513                Debug::fmt(&self.0.borrow(), f)
514            }
515        })
516    }
517}
518
519impl Hash for TeeNode {
520    fn hash<H: Hasher>(&self, state: &mut H) {
521        self.0.borrow_mut().hash(state);
522    }
523}
524
525#[derive(Debug, Clone)]
526pub struct HydroIrMetadata {
527    pub location_kind: LocationId,
528    pub output_type: Option<DebugType>,
529    pub cardinality: Option<usize>,
530    pub cpu_usage: Option<f64>,
531}
532
533// HydroIrMetadata shouldn't be used to hash or compare
534impl Hash for HydroIrMetadata {
535    fn hash<H: Hasher>(&self, _: &mut H) {}
536}
537
538impl PartialEq for HydroIrMetadata {
539    fn eq(&self, _: &Self) -> bool {
540        true
541    }
542}
543
544impl Eq for HydroIrMetadata {}
545
546/// An intermediate node in a Hydro graph, which consumes data
547/// from upstream nodes and emits data to downstream nodes.
548#[derive(Debug, Hash)]
549pub enum HydroNode {
550    Placeholder,
551
552    Source {
553        source: HydroSource,
554        location_kind: LocationId,
555        metadata: HydroIrMetadata,
556    },
557
558    CycleSource {
559        ident: syn::Ident,
560        location_kind: LocationId,
561        metadata: HydroIrMetadata,
562    },
563
564    Tee {
565        inner: TeeNode,
566        metadata: HydroIrMetadata,
567    },
568
569    Persist {
570        inner: Box<HydroNode>,
571        metadata: HydroIrMetadata,
572    },
573
574    Unpersist {
575        inner: Box<HydroNode>,
576        metadata: HydroIrMetadata,
577    },
578
579    Delta {
580        inner: Box<HydroNode>,
581        metadata: HydroIrMetadata,
582    },
583
584    Chain {
585        first: Box<HydroNode>,
586        second: Box<HydroNode>,
587        metadata: HydroIrMetadata,
588    },
589
590    CrossProduct {
591        left: Box<HydroNode>,
592        right: Box<HydroNode>,
593        metadata: HydroIrMetadata,
594    },
595
596    CrossSingleton {
597        left: Box<HydroNode>,
598        right: Box<HydroNode>,
599        metadata: HydroIrMetadata,
600    },
601
602    Join {
603        left: Box<HydroNode>,
604        right: Box<HydroNode>,
605        metadata: HydroIrMetadata,
606    },
607
608    Difference {
609        pos: Box<HydroNode>,
610        neg: Box<HydroNode>,
611        metadata: HydroIrMetadata,
612    },
613
614    AntiJoin {
615        pos: Box<HydroNode>,
616        neg: Box<HydroNode>,
617        metadata: HydroIrMetadata,
618    },
619
620    ResolveFutures {
621        input: Box<HydroNode>,
622        metadata: HydroIrMetadata,
623    },
624    ResolveFuturesOrdered {
625        input: Box<HydroNode>,
626        metadata: HydroIrMetadata,
627    },
628
629    Map {
630        f: DebugExpr,
631        input: Box<HydroNode>,
632        metadata: HydroIrMetadata,
633    },
634    FlatMap {
635        f: DebugExpr,
636        input: Box<HydroNode>,
637        metadata: HydroIrMetadata,
638    },
639    Filter {
640        f: DebugExpr,
641        input: Box<HydroNode>,
642        metadata: HydroIrMetadata,
643    },
644    FilterMap {
645        f: DebugExpr,
646        input: Box<HydroNode>,
647        metadata: HydroIrMetadata,
648    },
649
650    DeferTick {
651        input: Box<HydroNode>,
652        metadata: HydroIrMetadata,
653    },
654    Enumerate {
655        is_static: bool,
656        input: Box<HydroNode>,
657        metadata: HydroIrMetadata,
658    },
659    Inspect {
660        f: DebugExpr,
661        input: Box<HydroNode>,
662        metadata: HydroIrMetadata,
663    },
664
665    Unique {
666        input: Box<HydroNode>,
667        metadata: HydroIrMetadata,
668    },
669
670    Sort {
671        input: Box<HydroNode>,
672        metadata: HydroIrMetadata,
673    },
674    Fold {
675        init: DebugExpr,
676        acc: DebugExpr,
677        input: Box<HydroNode>,
678        metadata: HydroIrMetadata,
679    },
680    FoldKeyed {
681        init: DebugExpr,
682        acc: DebugExpr,
683        input: Box<HydroNode>,
684        metadata: HydroIrMetadata,
685    },
686
687    Reduce {
688        f: DebugExpr,
689        input: Box<HydroNode>,
690        metadata: HydroIrMetadata,
691    },
692    ReduceKeyed {
693        f: DebugExpr,
694        input: Box<HydroNode>,
695        metadata: HydroIrMetadata,
696    },
697
698    Network {
699        from_key: Option<usize>,
700        to_location: LocationId,
701        to_key: Option<usize>,
702        serialize_fn: Option<DebugExpr>,
703        instantiate_fn: DebugInstantiate,
704        deserialize_fn: Option<DebugExpr>,
705        input: Box<HydroNode>,
706        metadata: HydroIrMetadata,
707    },
708
709    Counter {
710        tag: String,
711        duration: DebugExpr,
712        input: Box<HydroNode>,
713        metadata: HydroIrMetadata,
714    },
715}
716
717pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
718pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
719
720impl HydroNode {
721    #[cfg(feature = "build")]
722    pub fn compile_network<'a, D>(
723        &mut self,
724        compile_env: &D::CompileEnv,
725        seen_tees: &mut SeenTees,
726        seen_tee_locations: &mut SeenTeeLocations,
727        nodes: &HashMap<usize, D::Process>,
728        clusters: &HashMap<usize, D::Cluster>,
729        externals: &HashMap<usize, D::ExternalProcess>,
730    ) where
731        D: Deploy<'a>,
732    {
733        let mut curr_location = None;
734
735        self.transform_bottom_up(
736            &mut |n| {
737                if let HydroNode::Network {
738                    from_key,
739                    to_location,
740                    to_key,
741                    instantiate_fn,
742                    ..
743                } = n
744                {
745                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
746                        DebugInstantiate::Building => instantiate_network::<D>(
747                            curr_location.as_ref().unwrap(),
748                            *from_key,
749                            to_location,
750                            *to_key,
751                            nodes,
752                            clusters,
753                            externals,
754                            compile_env,
755                        ),
756
757                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
758                    };
759
760                    *instantiate_fn = DebugInstantiateFinalized {
761                        sink: sink_expr,
762                        source: source_expr,
763                        connect_fn: Some(connect_fn),
764                    }
765                    .into();
766                }
767
768                // Calculate location of current node to use as from_location
769                match n {
770                    HydroNode::Network {
771                        to_location: location_kind,
772                        ..
773                    }
774                    | HydroNode::CycleSource { location_kind, .. }
775                    | HydroNode::Source { location_kind, .. } => {
776                        // Unwrap location out of Tick
777                        if let LocationId::Tick(_, tick_loc) = location_kind {
778                            curr_location = Some(*tick_loc.clone());
779                        } else {
780                            curr_location = Some(location_kind.clone());
781                        }
782                    }
783                    HydroNode::Tee { inner, .. } => {
784                        if let Some(tee_location) = seen_tee_locations.get(&inner.as_ptr()) {
785                            curr_location = Some(tee_location.clone());
786                        } else {
787                            seen_tee_locations
788                                .insert(inner.as_ptr(), curr_location.as_ref().unwrap().clone());
789                        }
790                    }
791                    _ => {}
792                }
793            },
794            seen_tees,
795        );
796    }
797
798    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
799        self.transform_bottom_up(
800            &mut |n| {
801                if let HydroNode::Network { instantiate_fn, .. } = n {
802                    match instantiate_fn {
803                        DebugInstantiate::Building => panic!("network not built"),
804
805                        DebugInstantiate::Finalized(finalized) => {
806                            (finalized.connect_fn.take().unwrap())();
807                        }
808                    }
809                }
810            },
811            seen_tees,
812        );
813    }
814
815    pub fn transform_bottom_up(
816        &mut self,
817        transform: &mut impl FnMut(&mut HydroNode),
818        seen_tees: &mut SeenTees,
819    ) {
820        self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
821
822        transform(self);
823    }
824
825    #[inline(always)]
826    pub fn transform_children(
827        &mut self,
828        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
829        seen_tees: &mut SeenTees,
830    ) {
831        match self {
832            HydroNode::Placeholder => {
833                panic!();
834            }
835
836            HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
837
838            HydroNode::Tee { inner, .. } => {
839                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
840                    *inner = TeeNode(transformed.clone());
841                } else {
842                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
843                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
844                    let mut orig = inner.0.replace(HydroNode::Placeholder);
845                    transform(&mut orig, seen_tees);
846                    *transformed_cell.borrow_mut() = orig;
847                    *inner = TeeNode(transformed_cell);
848                }
849            }
850
851            HydroNode::Persist { inner, .. }
852            | HydroNode::Unpersist { inner, .. }
853            | HydroNode::Delta { inner, .. } => {
854                transform(inner.as_mut(), seen_tees);
855            }
856
857            HydroNode::Chain { first, second, .. } => {
858                transform(first.as_mut(), seen_tees);
859                transform(second.as_mut(), seen_tees);
860            }
861
862            HydroNode::CrossSingleton { left, right, .. }
863            | HydroNode::CrossProduct { left, right, .. }
864            | HydroNode::Join { left, right, .. } => {
865                transform(left.as_mut(), seen_tees);
866                transform(right.as_mut(), seen_tees);
867            }
868
869            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
870                transform(pos.as_mut(), seen_tees);
871                transform(neg.as_mut(), seen_tees);
872            }
873
874            HydroNode::Map { input, .. }
875            | HydroNode::ResolveFutures { input, .. }
876            | HydroNode::ResolveFuturesOrdered { input, .. }
877            | HydroNode::FlatMap { input, .. }
878            | HydroNode::Filter { input, .. }
879            | HydroNode::FilterMap { input, .. }
880            | HydroNode::Sort { input, .. }
881            | HydroNode::DeferTick { input, .. }
882            | HydroNode::Enumerate { input, .. }
883            | HydroNode::Inspect { input, .. }
884            | HydroNode::Unique { input, .. }
885            | HydroNode::Network { input, .. }
886            | HydroNode::Fold { input, .. }
887            | HydroNode::FoldKeyed { input, .. }
888            | HydroNode::Reduce { input, .. }
889            | HydroNode::ReduceKeyed { input, .. }
890            | HydroNode::Counter { input, .. } => {
891                transform(input.as_mut(), seen_tees);
892            }
893        }
894    }
895
896    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
897        match self {
898            HydroNode::Placeholder => HydroNode::Placeholder,
899            HydroNode::Source {
900                source,
901                location_kind,
902                metadata,
903            } => HydroNode::Source {
904                source: source.clone(),
905                location_kind: location_kind.clone(),
906                metadata: metadata.clone(),
907            },
908            HydroNode::CycleSource {
909                ident,
910                location_kind,
911                metadata,
912            } => HydroNode::CycleSource {
913                ident: ident.clone(),
914                location_kind: location_kind.clone(),
915                metadata: metadata.clone(),
916            },
917            HydroNode::Tee { inner, metadata } => {
918                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
919                    HydroNode::Tee {
920                        inner: TeeNode(transformed.clone()),
921                        metadata: metadata.clone(),
922                    }
923                } else {
924                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
925                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
926                    let cloned = inner.0.borrow().deep_clone(seen_tees);
927                    *new_rc.borrow_mut() = cloned;
928                    HydroNode::Tee {
929                        inner: TeeNode(new_rc),
930                        metadata: metadata.clone(),
931                    }
932                }
933            }
934            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
935                inner: Box::new(inner.deep_clone(seen_tees)),
936                metadata: metadata.clone(),
937            },
938            HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
939                inner: Box::new(inner.deep_clone(seen_tees)),
940                metadata: metadata.clone(),
941            },
942            HydroNode::Delta { inner, metadata } => HydroNode::Delta {
943                inner: Box::new(inner.deep_clone(seen_tees)),
944                metadata: metadata.clone(),
945            },
946            HydroNode::Chain {
947                first,
948                second,
949                metadata,
950            } => HydroNode::Chain {
951                first: Box::new(first.deep_clone(seen_tees)),
952                second: Box::new(second.deep_clone(seen_tees)),
953                metadata: metadata.clone(),
954            },
955            HydroNode::CrossProduct {
956                left,
957                right,
958                metadata,
959            } => HydroNode::CrossProduct {
960                left: Box::new(left.deep_clone(seen_tees)),
961                right: Box::new(right.deep_clone(seen_tees)),
962                metadata: metadata.clone(),
963            },
964            HydroNode::CrossSingleton {
965                left,
966                right,
967                metadata,
968            } => HydroNode::CrossSingleton {
969                left: Box::new(left.deep_clone(seen_tees)),
970                right: Box::new(right.deep_clone(seen_tees)),
971                metadata: metadata.clone(),
972            },
973            HydroNode::Join {
974                left,
975                right,
976                metadata,
977            } => HydroNode::Join {
978                left: Box::new(left.deep_clone(seen_tees)),
979                right: Box::new(right.deep_clone(seen_tees)),
980                metadata: metadata.clone(),
981            },
982            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
983                pos: Box::new(pos.deep_clone(seen_tees)),
984                neg: Box::new(neg.deep_clone(seen_tees)),
985                metadata: metadata.clone(),
986            },
987            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
988                pos: Box::new(pos.deep_clone(seen_tees)),
989                neg: Box::new(neg.deep_clone(seen_tees)),
990                metadata: metadata.clone(),
991            },
992            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
993                input: Box::new(input.deep_clone(seen_tees)),
994                metadata: metadata.clone(),
995            },
996            HydroNode::ResolveFuturesOrdered { input, metadata } => {
997                HydroNode::ResolveFuturesOrdered {
998                    input: Box::new(input.deep_clone(seen_tees)),
999                    metadata: metadata.clone(),
1000                }
1001            }
1002            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1003                f: f.clone(),
1004                input: Box::new(input.deep_clone(seen_tees)),
1005                metadata: metadata.clone(),
1006            },
1007            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1008                f: f.clone(),
1009                input: Box::new(input.deep_clone(seen_tees)),
1010                metadata: metadata.clone(),
1011            },
1012            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1013                f: f.clone(),
1014                input: Box::new(input.deep_clone(seen_tees)),
1015                metadata: metadata.clone(),
1016            },
1017            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1018                f: f.clone(),
1019                input: Box::new(input.deep_clone(seen_tees)),
1020                metadata: metadata.clone(),
1021            },
1022            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1023                input: Box::new(input.deep_clone(seen_tees)),
1024                metadata: metadata.clone(),
1025            },
1026            HydroNode::Enumerate {
1027                is_static,
1028                input,
1029                metadata,
1030            } => HydroNode::Enumerate {
1031                is_static: *is_static,
1032                input: Box::new(input.deep_clone(seen_tees)),
1033                metadata: metadata.clone(),
1034            },
1035            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1036                f: f.clone(),
1037                input: Box::new(input.deep_clone(seen_tees)),
1038                metadata: metadata.clone(),
1039            },
1040            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1041                input: Box::new(input.deep_clone(seen_tees)),
1042                metadata: metadata.clone(),
1043            },
1044            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1045                input: Box::new(input.deep_clone(seen_tees)),
1046                metadata: metadata.clone(),
1047            },
1048            HydroNode::Fold {
1049                init,
1050                acc,
1051                input,
1052                metadata,
1053            } => HydroNode::Fold {
1054                init: init.clone(),
1055                acc: acc.clone(),
1056                input: Box::new(input.deep_clone(seen_tees)),
1057                metadata: metadata.clone(),
1058            },
1059            HydroNode::FoldKeyed {
1060                init,
1061                acc,
1062                input,
1063                metadata,
1064            } => HydroNode::FoldKeyed {
1065                init: init.clone(),
1066                acc: acc.clone(),
1067                input: Box::new(input.deep_clone(seen_tees)),
1068                metadata: metadata.clone(),
1069            },
1070            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1071                f: f.clone(),
1072                input: Box::new(input.deep_clone(seen_tees)),
1073                metadata: metadata.clone(),
1074            },
1075            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1076                f: f.clone(),
1077                input: Box::new(input.deep_clone(seen_tees)),
1078                metadata: metadata.clone(),
1079            },
1080            HydroNode::Network {
1081                from_key,
1082                to_location,
1083                to_key,
1084                serialize_fn,
1085                instantiate_fn,
1086                deserialize_fn,
1087                input,
1088                metadata,
1089            } => HydroNode::Network {
1090                from_key: *from_key,
1091                to_location: to_location.clone(),
1092                to_key: *to_key,
1093                serialize_fn: serialize_fn.clone(),
1094                instantiate_fn: instantiate_fn.clone(),
1095                deserialize_fn: deserialize_fn.clone(),
1096                input: Box::new(input.deep_clone(seen_tees)),
1097                metadata: metadata.clone(),
1098            },
1099            HydroNode::Counter {
1100                tag,
1101                duration,
1102                input,
1103                metadata,
1104            } => HydroNode::Counter {
1105                tag: tag.clone(),
1106                duration: duration.clone(),
1107                input: Box::new(input.deep_clone(seen_tees)),
1108                metadata: metadata.clone(),
1109            },
1110        }
1111    }
1112
1113    #[cfg(feature = "build")]
1114    pub fn emit_core(
1115        &mut self,
1116        builders_or_callback: &mut BuildersOrCallback<
1117            impl FnMut(&mut HydroLeaf, &mut usize),
1118            impl FnMut(&mut HydroNode, &mut usize),
1119        >,
1120        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1121        next_stmt_id: &mut usize,
1122    ) -> (syn::Ident, usize) {
1123        match self {
1124            HydroNode::Placeholder => {
1125                panic!()
1126            }
1127
1128            HydroNode::Persist { inner, .. } => {
1129                let (inner_ident, location) =
1130                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1131
1132                let persist_ident =
1133                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1134
1135                match builders_or_callback {
1136                    BuildersOrCallback::Builders(graph_builders) => {
1137                        let builder = graph_builders.entry(location).or_default();
1138                        builder.add_dfir(
1139                            parse_quote! {
1140                                #persist_ident = #inner_ident -> persist::<'static>();
1141                            },
1142                            None,
1143                            Some(&next_stmt_id.to_string()),
1144                        );
1145                    }
1146                    BuildersOrCallback::Callback(_, node_callback) => {
1147                        node_callback(self, next_stmt_id);
1148                    }
1149                }
1150
1151                *next_stmt_id += 1;
1152
1153                (persist_ident, location)
1154            }
1155
1156            HydroNode::Unpersist { .. } => {
1157                panic!(
1158                    "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1159                )
1160            }
1161
1162            HydroNode::Delta { inner, .. } => {
1163                let (inner_ident, location) =
1164                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1165
1166                let delta_ident =
1167                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1168
1169                match builders_or_callback {
1170                    BuildersOrCallback::Builders(graph_builders) => {
1171                        let builder = graph_builders.entry(location).or_default();
1172                        builder.add_dfir(
1173                            parse_quote! {
1174                                #delta_ident = #inner_ident -> multiset_delta();
1175                            },
1176                            None,
1177                            Some(&next_stmt_id.to_string()),
1178                        );
1179                    }
1180                    BuildersOrCallback::Callback(_, node_callback) => {
1181                        node_callback(self, next_stmt_id);
1182                    }
1183                }
1184
1185                *next_stmt_id += 1;
1186
1187                (delta_ident, location)
1188            }
1189
1190            HydroNode::Source {
1191                source,
1192                location_kind,
1193                ..
1194            } => {
1195                let location_id = match location_kind.clone() {
1196                    LocationId::Process(id) => id,
1197                    LocationId::Cluster(id) => id,
1198                    LocationId::Tick(_, _) => panic!(),
1199                    LocationId::ExternalProcess(id) => id,
1200                };
1201
1202                if let HydroSource::ExternalNetwork() = source {
1203                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1204                } else {
1205                    let source_ident =
1206                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1207
1208                    let source_stmt = match source {
1209                        HydroSource::Stream(expr) => {
1210                            parse_quote! {
1211                                #source_ident = source_stream(#expr);
1212                            }
1213                        }
1214
1215                        HydroSource::ExternalNetwork() => {
1216                            unreachable!()
1217                        }
1218
1219                        HydroSource::Iter(expr) => {
1220                            parse_quote! {
1221                                #source_ident = source_iter(#expr);
1222                            }
1223                        }
1224
1225                        HydroSource::Spin() => {
1226                            parse_quote! {
1227                                #source_ident = spin();
1228                            }
1229                        }
1230                    };
1231
1232                    match builders_or_callback {
1233                        BuildersOrCallback::Builders(graph_builders) => {
1234                            let builder = graph_builders.entry(location_id).or_default();
1235                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1236                        }
1237                        BuildersOrCallback::Callback(_, node_callback) => {
1238                            node_callback(self, next_stmt_id);
1239                        }
1240                    }
1241
1242                    *next_stmt_id += 1;
1243
1244                    (source_ident, location_id)
1245                }
1246            }
1247
1248            HydroNode::CycleSource {
1249                ident,
1250                location_kind,
1251                ..
1252            } => {
1253                let location_id = *match location_kind.root() {
1254                    LocationId::Process(id) => id,
1255                    LocationId::Cluster(id) => id,
1256                    LocationId::Tick(_, _) => panic!(),
1257                    LocationId::ExternalProcess(_) => panic!(),
1258                };
1259
1260                let ident = ident.clone();
1261
1262                match builders_or_callback {
1263                    BuildersOrCallback::Builders(_) => {}
1264                    BuildersOrCallback::Callback(_, node_callback) => {
1265                        node_callback(self, next_stmt_id);
1266                    }
1267                }
1268
1269                // consume a stmt id even though we did not emit anything so that we can instrument this
1270                *next_stmt_id += 1;
1271
1272                (ident, location_id)
1273            }
1274
1275            HydroNode::Tee { inner, .. } => {
1276                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1277                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1278                {
1279                    match builders_or_callback {
1280                        BuildersOrCallback::Builders(_) => {}
1281                        BuildersOrCallback::Callback(_, node_callback) => {
1282                            node_callback(self, next_stmt_id);
1283                        }
1284                    }
1285
1286                    (teed_from.clone(), *inner_location_id)
1287                } else {
1288                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1289                        builders_or_callback,
1290                        built_tees,
1291                        next_stmt_id,
1292                    );
1293
1294                    let tee_ident =
1295                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1296
1297                    built_tees.insert(
1298                        inner.0.as_ref() as *const RefCell<HydroNode>,
1299                        (tee_ident.clone(), inner_location_id),
1300                    );
1301
1302                    match builders_or_callback {
1303                        BuildersOrCallback::Builders(graph_builders) => {
1304                            let builder = graph_builders.entry(inner_location_id).or_default();
1305                            builder.add_dfir(
1306                                parse_quote! {
1307                                    #tee_ident = #inner_ident -> tee();
1308                                },
1309                                None,
1310                                Some(&next_stmt_id.to_string()),
1311                            );
1312                        }
1313                        BuildersOrCallback::Callback(_, node_callback) => {
1314                            node_callback(self, next_stmt_id);
1315                        }
1316                    }
1317
1318                    (tee_ident, inner_location_id)
1319                };
1320
1321                // we consume a stmt id regardless of if we emit the tee() operator,
1322                // so that during rewrites we touch all recipients of the tee()
1323
1324                *next_stmt_id += 1;
1325                (ret_ident, inner_location_id)
1326            }
1327
1328            HydroNode::Chain { first, second, .. } => {
1329                let (first_ident, first_location_id) =
1330                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1331                let (second_ident, second_location_id) =
1332                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1333
1334                assert_eq!(
1335                    first_location_id, second_location_id,
1336                    "chain inputs must be in the same location"
1337                );
1338
1339                let chain_ident =
1340                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1341
1342                match builders_or_callback {
1343                    BuildersOrCallback::Builders(graph_builders) => {
1344                        let builder = graph_builders.entry(first_location_id).or_default();
1345                        builder.add_dfir(
1346                            parse_quote! {
1347                                #chain_ident = chain();
1348                                #first_ident -> [0]#chain_ident;
1349                                #second_ident -> [1]#chain_ident;
1350                            },
1351                            None,
1352                            Some(&next_stmt_id.to_string()),
1353                        );
1354                    }
1355                    BuildersOrCallback::Callback(_, node_callback) => {
1356                        node_callback(self, next_stmt_id);
1357                    }
1358                }
1359
1360                *next_stmt_id += 1;
1361
1362                (chain_ident, first_location_id)
1363            }
1364
1365            HydroNode::CrossSingleton { left, right, .. } => {
1366                let (left_ident, left_location_id) =
1367                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1368                let (right_ident, right_location_id) =
1369                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1370
1371                assert_eq!(
1372                    left_location_id, right_location_id,
1373                    "cross_singleton inputs must be in the same location"
1374                );
1375
1376                let cross_ident =
1377                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1378
1379                match builders_or_callback {
1380                    BuildersOrCallback::Builders(graph_builders) => {
1381                        let builder = graph_builders.entry(left_location_id).or_default();
1382                        builder.add_dfir(
1383                            parse_quote! {
1384                                #cross_ident = cross_singleton();
1385                                #left_ident -> [input]#cross_ident;
1386                                #right_ident -> [single]#cross_ident;
1387                            },
1388                            None,
1389                            Some(&next_stmt_id.to_string()),
1390                        );
1391                    }
1392                    BuildersOrCallback::Callback(_, node_callback) => {
1393                        node_callback(self, next_stmt_id);
1394                    }
1395                }
1396
1397                *next_stmt_id += 1;
1398
1399                (cross_ident, left_location_id)
1400            }
1401
1402            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1403                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1404                    parse_quote!(cross_join_multiset)
1405                } else {
1406                    parse_quote!(join_multiset)
1407                };
1408
1409                let (HydroNode::CrossProduct { left, right, .. }
1410                | HydroNode::Join { left, right, .. }) = self
1411                else {
1412                    unreachable!()
1413                };
1414
1415                let (left_inner, left_lifetime) =
1416                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1417                        (left, quote!('static))
1418                    } else {
1419                        (left, quote!('tick))
1420                    };
1421
1422                let (right_inner, right_lifetime) =
1423                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1424                        (right, quote!('static))
1425                    } else {
1426                        (right, quote!('tick))
1427                    };
1428
1429                let (left_ident, left_location_id) =
1430                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1431                let (right_ident, right_location_id) =
1432                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1433
1434                assert_eq!(
1435                    left_location_id, right_location_id,
1436                    "join / cross product inputs must be in the same location"
1437                );
1438
1439                let stream_ident =
1440                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1441
1442                match builders_or_callback {
1443                    BuildersOrCallback::Builders(graph_builders) => {
1444                        let builder = graph_builders.entry(left_location_id).or_default();
1445                        builder.add_dfir(
1446                            parse_quote! {
1447                                #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1448                                #left_ident -> [0]#stream_ident;
1449                                #right_ident -> [1]#stream_ident;
1450                            },
1451                            None,
1452                            Some(&next_stmt_id.to_string()),
1453                        );
1454                    }
1455                    BuildersOrCallback::Callback(_, node_callback) => {
1456                        node_callback(self, next_stmt_id);
1457                    }
1458                }
1459
1460                *next_stmt_id += 1;
1461
1462                (stream_ident, left_location_id)
1463            }
1464
1465            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1466                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1467                    parse_quote!(difference_multiset)
1468                } else {
1469                    parse_quote!(anti_join_multiset)
1470                };
1471
1472                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1473                    self
1474                else {
1475                    unreachable!()
1476                };
1477
1478                let (neg, neg_lifetime) =
1479                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1480                        (neg, quote!('static))
1481                    } else {
1482                        (neg, quote!('tick))
1483                    };
1484
1485                let (pos_ident, pos_location_id) =
1486                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1487                let (neg_ident, neg_location_id) =
1488                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1489
1490                assert_eq!(
1491                    pos_location_id, neg_location_id,
1492                    "difference / anti join inputs must be in the same location"
1493                );
1494
1495                let stream_ident =
1496                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1497
1498                match builders_or_callback {
1499                    BuildersOrCallback::Builders(graph_builders) => {
1500                        let builder = graph_builders.entry(pos_location_id).or_default();
1501                        builder.add_dfir(
1502                            parse_quote! {
1503                                #stream_ident = #operator::<'tick, #neg_lifetime>();
1504                                #pos_ident -> [pos]#stream_ident;
1505                                #neg_ident -> [neg]#stream_ident;
1506                            },
1507                            None,
1508                            Some(&next_stmt_id.to_string()),
1509                        );
1510                    }
1511                    BuildersOrCallback::Callback(_, node_callback) => {
1512                        node_callback(self, next_stmt_id);
1513                    }
1514                }
1515
1516                *next_stmt_id += 1;
1517
1518                (stream_ident, pos_location_id)
1519            }
1520
1521            HydroNode::ResolveFutures { input, .. } => {
1522                let (input_ident, input_location_id) =
1523                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1524
1525                let futures_ident =
1526                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1527
1528                match builders_or_callback {
1529                    BuildersOrCallback::Builders(graph_builders) => {
1530                        let builder = graph_builders.entry(input_location_id).or_default();
1531                        builder.add_dfir(
1532                            parse_quote! {
1533                                #futures_ident = #input_ident -> resolve_futures();
1534                            },
1535                            None,
1536                            Some(&next_stmt_id.to_string()),
1537                        );
1538                    }
1539                    BuildersOrCallback::Callback(_, node_callback) => {
1540                        node_callback(self, next_stmt_id);
1541                    }
1542                }
1543
1544                *next_stmt_id += 1;
1545
1546                (futures_ident, input_location_id)
1547            }
1548
1549            HydroNode::ResolveFuturesOrdered { input, .. } => {
1550                let (input_ident, input_location_id) =
1551                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1552
1553                let futures_ident =
1554                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1555
1556                match builders_or_callback {
1557                    BuildersOrCallback::Builders(graph_builders) => {
1558                        let builder = graph_builders.entry(input_location_id).or_default();
1559                        builder.add_dfir(
1560                            parse_quote! {
1561                                #futures_ident = #input_ident -> resolve_futures_ordered();
1562                            },
1563                            None,
1564                            Some(&next_stmt_id.to_string()),
1565                        );
1566                    }
1567                    BuildersOrCallback::Callback(_, node_callback) => {
1568                        node_callback(self, next_stmt_id);
1569                    }
1570                }
1571
1572                *next_stmt_id += 1;
1573
1574                (futures_ident, input_location_id)
1575            }
1576
1577            HydroNode::Map { f, input, .. } => {
1578                let (input_ident, input_location_id) =
1579                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1580
1581                let map_ident =
1582                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1583
1584                match builders_or_callback {
1585                    BuildersOrCallback::Builders(graph_builders) => {
1586                        let builder = graph_builders.entry(input_location_id).or_default();
1587                        builder.add_dfir(
1588                            parse_quote! {
1589                                #map_ident = #input_ident -> map(#f);
1590                            },
1591                            None,
1592                            Some(&next_stmt_id.to_string()),
1593                        );
1594                    }
1595                    BuildersOrCallback::Callback(_, node_callback) => {
1596                        node_callback(self, next_stmt_id);
1597                    }
1598                }
1599
1600                *next_stmt_id += 1;
1601
1602                (map_ident, input_location_id)
1603            }
1604
1605            HydroNode::FlatMap { f, input, .. } => {
1606                let (input_ident, input_location_id) =
1607                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1608
1609                let flat_map_ident =
1610                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1611
1612                match builders_or_callback {
1613                    BuildersOrCallback::Builders(graph_builders) => {
1614                        let builder = graph_builders.entry(input_location_id).or_default();
1615                        builder.add_dfir(
1616                            parse_quote! {
1617                                #flat_map_ident = #input_ident -> flat_map(#f);
1618                            },
1619                            None,
1620                            Some(&next_stmt_id.to_string()),
1621                        );
1622                    }
1623                    BuildersOrCallback::Callback(_, node_callback) => {
1624                        node_callback(self, next_stmt_id);
1625                    }
1626                }
1627
1628                *next_stmt_id += 1;
1629
1630                (flat_map_ident, input_location_id)
1631            }
1632
1633            HydroNode::Filter { f, input, .. } => {
1634                let (input_ident, input_location_id) =
1635                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1636
1637                let filter_ident =
1638                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1639
1640                match builders_or_callback {
1641                    BuildersOrCallback::Builders(graph_builders) => {
1642                        let builder = graph_builders.entry(input_location_id).or_default();
1643                        builder.add_dfir(
1644                            parse_quote! {
1645                                #filter_ident = #input_ident -> filter(#f);
1646                            },
1647                            None,
1648                            Some(&next_stmt_id.to_string()),
1649                        );
1650                    }
1651                    BuildersOrCallback::Callback(_, node_callback) => {
1652                        node_callback(self, next_stmt_id);
1653                    }
1654                }
1655
1656                *next_stmt_id += 1;
1657
1658                (filter_ident, input_location_id)
1659            }
1660
1661            HydroNode::FilterMap { f, input, .. } => {
1662                let (input_ident, input_location_id) =
1663                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1664
1665                let filter_map_ident =
1666                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1667
1668                match builders_or_callback {
1669                    BuildersOrCallback::Builders(graph_builders) => {
1670                        let builder = graph_builders.entry(input_location_id).or_default();
1671                        builder.add_dfir(
1672                            parse_quote! {
1673                                #filter_map_ident = #input_ident -> filter_map(#f);
1674                            },
1675                            None,
1676                            Some(&next_stmt_id.to_string()),
1677                        );
1678                    }
1679                    BuildersOrCallback::Callback(_, node_callback) => {
1680                        node_callback(self, next_stmt_id);
1681                    }
1682                }
1683
1684                *next_stmt_id += 1;
1685
1686                (filter_map_ident, input_location_id)
1687            }
1688
1689            HydroNode::Sort { input, .. } => {
1690                let (input_ident, input_location_id) =
1691                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1692
1693                let sort_ident =
1694                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1695
1696                match builders_or_callback {
1697                    BuildersOrCallback::Builders(graph_builders) => {
1698                        let builder = graph_builders.entry(input_location_id).or_default();
1699                        builder.add_dfir(
1700                            parse_quote! {
1701                                #sort_ident = #input_ident -> sort();
1702                            },
1703                            None,
1704                            Some(&next_stmt_id.to_string()),
1705                        );
1706                    }
1707                    BuildersOrCallback::Callback(_, node_callback) => {
1708                        node_callback(self, next_stmt_id);
1709                    }
1710                }
1711
1712                *next_stmt_id += 1;
1713
1714                (sort_ident, input_location_id)
1715            }
1716
1717            HydroNode::DeferTick { input, .. } => {
1718                let (input_ident, input_location_id) =
1719                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1720
1721                let defer_tick_ident =
1722                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1723
1724                match builders_or_callback {
1725                    BuildersOrCallback::Builders(graph_builders) => {
1726                        let builder = graph_builders.entry(input_location_id).or_default();
1727                        builder.add_dfir(
1728                            parse_quote! {
1729                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
1730                            },
1731                            None,
1732                            Some(&next_stmt_id.to_string()),
1733                        );
1734                    }
1735                    BuildersOrCallback::Callback(_, node_callback) => {
1736                        node_callback(self, next_stmt_id);
1737                    }
1738                }
1739
1740                *next_stmt_id += 1;
1741
1742                (defer_tick_ident, input_location_id)
1743            }
1744
1745            HydroNode::Enumerate {
1746                is_static, input, ..
1747            } => {
1748                let (input_ident, input_location_id) =
1749                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1750
1751                let enumerate_ident =
1752                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1753
1754                match builders_or_callback {
1755                    BuildersOrCallback::Builders(graph_builders) => {
1756                        let builder = graph_builders.entry(input_location_id).or_default();
1757                        let lifetime = if *is_static {
1758                            quote!('static)
1759                        } else {
1760                            quote!('tick)
1761                        };
1762                        builder.add_dfir(
1763                            parse_quote! {
1764                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1765                            },
1766                            None,
1767                            Some(&next_stmt_id.to_string()),
1768                        );
1769                    }
1770                    BuildersOrCallback::Callback(_, node_callback) => {
1771                        node_callback(self, next_stmt_id);
1772                    }
1773                }
1774
1775                *next_stmt_id += 1;
1776
1777                (enumerate_ident, input_location_id)
1778            }
1779
1780            HydroNode::Inspect { f, input, .. } => {
1781                let (input_ident, input_location_id) =
1782                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1783
1784                let inspect_ident =
1785                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1786
1787                match builders_or_callback {
1788                    BuildersOrCallback::Builders(graph_builders) => {
1789                        let builder = graph_builders.entry(input_location_id).or_default();
1790                        builder.add_dfir(
1791                            parse_quote! {
1792                                #inspect_ident = #input_ident -> inspect(#f);
1793                            },
1794                            None,
1795                            Some(&next_stmt_id.to_string()),
1796                        );
1797                    }
1798                    BuildersOrCallback::Callback(_, node_callback) => {
1799                        node_callback(self, next_stmt_id);
1800                    }
1801                }
1802
1803                *next_stmt_id += 1;
1804
1805                (inspect_ident, input_location_id)
1806            }
1807
1808            HydroNode::Unique { input, .. } => {
1809                let (input_ident, input_location_id) =
1810                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1811
1812                let unique_ident =
1813                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1814
1815                match builders_or_callback {
1816                    BuildersOrCallback::Builders(graph_builders) => {
1817                        let builder = graph_builders.entry(input_location_id).or_default();
1818                        builder.add_dfir(
1819                            parse_quote! {
1820                                #unique_ident = #input_ident -> unique::<'tick>();
1821                            },
1822                            None,
1823                            Some(&next_stmt_id.to_string()),
1824                        );
1825                    }
1826                    BuildersOrCallback::Callback(_, node_callback) => {
1827                        node_callback(self, next_stmt_id);
1828                    }
1829                }
1830
1831                *next_stmt_id += 1;
1832
1833                (unique_ident, input_location_id)
1834            }
1835
1836            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1837                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1838                    parse_quote!(fold)
1839                } else {
1840                    parse_quote!(fold_keyed)
1841                };
1842
1843                let (HydroNode::Fold {
1844                    init, acc, input, ..
1845                }
1846                | HydroNode::FoldKeyed {
1847                    init, acc, input, ..
1848                }) = self
1849                else {
1850                    unreachable!()
1851                };
1852
1853                let (input, lifetime) =
1854                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1855                        (input, quote!('static))
1856                    } else {
1857                        (input, quote!('tick))
1858                    };
1859
1860                let (input_ident, input_location_id) =
1861                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1862
1863                let fold_ident =
1864                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1865
1866                match builders_or_callback {
1867                    BuildersOrCallback::Builders(graph_builders) => {
1868                        let builder = graph_builders.entry(input_location_id).or_default();
1869                        builder.add_dfir(
1870                            parse_quote! {
1871                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1872                            },
1873                            None,
1874                            Some(&next_stmt_id.to_string()),
1875                        );
1876                    }
1877                    BuildersOrCallback::Callback(_, node_callback) => {
1878                        node_callback(self, next_stmt_id);
1879                    }
1880                }
1881
1882                *next_stmt_id += 1;
1883
1884                (fold_ident, input_location_id)
1885            }
1886
1887            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1888                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1889                    parse_quote!(reduce)
1890                } else {
1891                    parse_quote!(reduce_keyed)
1892                };
1893
1894                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1895                    self
1896                else {
1897                    unreachable!()
1898                };
1899
1900                let (input, lifetime) =
1901                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1902                        (input, quote!('static))
1903                    } else {
1904                        (input, quote!('tick))
1905                    };
1906
1907                let (input_ident, input_location_id) =
1908                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1909
1910                let reduce_ident =
1911                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1912
1913                match builders_or_callback {
1914                    BuildersOrCallback::Builders(graph_builders) => {
1915                        let builder = graph_builders.entry(input_location_id).or_default();
1916                        builder.add_dfir(
1917                            parse_quote! {
1918                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1919                            },
1920                            None,
1921                            Some(&next_stmt_id.to_string()),
1922                        );
1923                    }
1924                    BuildersOrCallback::Callback(_, node_callback) => {
1925                        node_callback(self, next_stmt_id);
1926                    }
1927                }
1928
1929                *next_stmt_id += 1;
1930
1931                (reduce_ident, input_location_id)
1932            }
1933
1934            HydroNode::Network {
1935                from_key: _,
1936                to_location,
1937                to_key: _,
1938                serialize_fn: serialize_pipeline,
1939                instantiate_fn,
1940                deserialize_fn: deserialize_pipeline,
1941                input,
1942                ..
1943            } => {
1944                let (input_ident, input_location_id) =
1945                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1946
1947                let to_id = match *to_location {
1948                    LocationId::Process(id) => id,
1949                    LocationId::Cluster(id) => id,
1950                    LocationId::Tick(_, _) => panic!(),
1951                    LocationId::ExternalProcess(id) => id,
1952                };
1953
1954                let receiver_stream_ident =
1955                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1956
1957                match builders_or_callback {
1958                    BuildersOrCallback::Builders(graph_builders) => {
1959                        let (sink_expr, source_expr) = match instantiate_fn {
1960                            DebugInstantiate::Building => (
1961                                syn::parse_quote!(DUMMY_SINK),
1962                                syn::parse_quote!(DUMMY_SOURCE),
1963                            ),
1964
1965                            DebugInstantiate::Finalized(finalized) => {
1966                                (finalized.sink.clone(), finalized.source.clone())
1967                            }
1968                        };
1969
1970                        let sender_builder = graph_builders.entry(input_location_id).or_default();
1971                        if let Some(serialize_pipeline) = serialize_pipeline {
1972                            sender_builder.add_dfir(
1973                                parse_quote! {
1974                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1975                                },
1976                                None,
1977                                Some(&next_stmt_id.to_string()),
1978                            );
1979                        } else {
1980                            sender_builder.add_dfir(
1981                                parse_quote! {
1982                                    #input_ident -> dest_sink(#sink_expr);
1983                                },
1984                                None,
1985                                Some(&next_stmt_id.to_string()),
1986                            );
1987                        }
1988
1989                        let receiver_builder = graph_builders.entry(to_id).or_default();
1990                        if let Some(deserialize_pipeline) = deserialize_pipeline {
1991                            receiver_builder.add_dfir(parse_quote! {
1992                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1993                            }, None, Some(&next_stmt_id.to_string()));
1994                        } else {
1995                            receiver_builder.add_dfir(
1996                                parse_quote! {
1997                                    #receiver_stream_ident = source_stream(#source_expr);
1998                                },
1999                                None,
2000                                Some(&next_stmt_id.to_string()),
2001                            );
2002                        }
2003                    }
2004                    BuildersOrCallback::Callback(_, node_callback) => {
2005                        node_callback(self, next_stmt_id);
2006                    }
2007                }
2008
2009                *next_stmt_id += 1;
2010
2011                (receiver_stream_ident, to_id)
2012            }
2013
2014            HydroNode::Counter {
2015                tag,
2016                duration,
2017                input,
2018                ..
2019            } => {
2020                let (input_ident, input_location_id) =
2021                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2022
2023                let counter_ident =
2024                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2025
2026                match builders_or_callback {
2027                    BuildersOrCallback::Builders(graph_builders) => {
2028                        let builder = graph_builders.entry(input_location_id).or_default();
2029                        builder.add_dfir(
2030                            parse_quote! {
2031                                #counter_ident = #input_ident -> _counter(#tag, #duration);
2032                            },
2033                            None,
2034                            Some(&next_stmt_id.to_string()),
2035                        );
2036                    }
2037                    BuildersOrCallback::Callback(_, node_callback) => {
2038                        node_callback(self, next_stmt_id);
2039                    }
2040                }
2041
2042                *next_stmt_id += 1;
2043
2044                (counter_ident, input_location_id)
2045            }
2046        }
2047    }
2048
2049    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2050        match self {
2051            HydroNode::Placeholder => {
2052                panic!()
2053            }
2054            HydroNode::Source { source, .. } => match source {
2055                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2056                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2057            },
2058            HydroNode::CycleSource { .. }
2059            | HydroNode::Tee { .. }
2060            | HydroNode::Persist { .. }
2061            | HydroNode::Unpersist { .. }
2062            | HydroNode::Delta { .. }
2063            | HydroNode::Chain { .. }
2064            | HydroNode::CrossProduct { .. }
2065            | HydroNode::CrossSingleton { .. }
2066            | HydroNode::ResolveFutures { .. }
2067            | HydroNode::ResolveFuturesOrdered { .. }
2068            | HydroNode::Join { .. }
2069            | HydroNode::Difference { .. }
2070            | HydroNode::AntiJoin { .. }
2071            | HydroNode::DeferTick { .. }
2072            | HydroNode::Enumerate { .. }
2073            | HydroNode::Unique { .. }
2074            | HydroNode::Sort { .. } => {}
2075            HydroNode::Map { f, .. }
2076            | HydroNode::FlatMap { f, .. }
2077            | HydroNode::Filter { f, .. }
2078            | HydroNode::FilterMap { f, .. }
2079            | HydroNode::Inspect { f, .. }
2080            | HydroNode::Reduce { f, .. }
2081            | HydroNode::ReduceKeyed { f, .. } => {
2082                transform(f);
2083            }
2084            HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
2085                transform(init);
2086                transform(acc);
2087            }
2088            HydroNode::Network {
2089                serialize_fn,
2090                deserialize_fn,
2091                ..
2092            } => {
2093                if let Some(serialize_fn) = serialize_fn {
2094                    transform(serialize_fn);
2095                }
2096                if let Some(deserialize_fn) = deserialize_fn {
2097                    transform(deserialize_fn);
2098                }
2099            }
2100            HydroNode::Counter { duration, .. } => {
2101                transform(duration);
2102            }
2103        }
2104    }
2105
2106    pub fn metadata(&self) -> &HydroIrMetadata {
2107        match self {
2108            HydroNode::Placeholder => {
2109                panic!()
2110            }
2111            HydroNode::Source { metadata, .. } => metadata,
2112            HydroNode::CycleSource { metadata, .. } => metadata,
2113            HydroNode::Tee { metadata, .. } => metadata,
2114            HydroNode::Persist { metadata, .. } => metadata,
2115            HydroNode::Unpersist { metadata, .. } => metadata,
2116            HydroNode::Delta { metadata, .. } => metadata,
2117            HydroNode::Chain { metadata, .. } => metadata,
2118            HydroNode::CrossProduct { metadata, .. } => metadata,
2119            HydroNode::CrossSingleton { metadata, .. } => metadata,
2120            HydroNode::Join { metadata, .. } => metadata,
2121            HydroNode::Difference { metadata, .. } => metadata,
2122            HydroNode::AntiJoin { metadata, .. } => metadata,
2123            HydroNode::ResolveFutures { metadata, .. } => metadata,
2124            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2125            HydroNode::Map { metadata, .. } => metadata,
2126            HydroNode::FlatMap { metadata, .. } => metadata,
2127            HydroNode::Filter { metadata, .. } => metadata,
2128            HydroNode::FilterMap { metadata, .. } => metadata,
2129            HydroNode::DeferTick { metadata, .. } => metadata,
2130            HydroNode::Enumerate { metadata, .. } => metadata,
2131            HydroNode::Inspect { metadata, .. } => metadata,
2132            HydroNode::Unique { metadata, .. } => metadata,
2133            HydroNode::Sort { metadata, .. } => metadata,
2134            HydroNode::Fold { metadata, .. } => metadata,
2135            HydroNode::FoldKeyed { metadata, .. } => metadata,
2136            HydroNode::Reduce { metadata, .. } => metadata,
2137            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2138            HydroNode::Network { metadata, .. } => metadata,
2139            HydroNode::Counter { metadata, .. } => metadata,
2140        }
2141    }
2142
2143    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2144        match self {
2145            HydroNode::Placeholder => {
2146                panic!()
2147            }
2148            HydroNode::Source { metadata, .. } => metadata,
2149            HydroNode::CycleSource { metadata, .. } => metadata,
2150            HydroNode::Tee { metadata, .. } => metadata,
2151            HydroNode::Persist { metadata, .. } => metadata,
2152            HydroNode::Unpersist { metadata, .. } => metadata,
2153            HydroNode::Delta { metadata, .. } => metadata,
2154            HydroNode::Chain { metadata, .. } => metadata,
2155            HydroNode::CrossProduct { metadata, .. } => metadata,
2156            HydroNode::CrossSingleton { metadata, .. } => metadata,
2157            HydroNode::Join { metadata, .. } => metadata,
2158            HydroNode::Difference { metadata, .. } => metadata,
2159            HydroNode::AntiJoin { metadata, .. } => metadata,
2160            HydroNode::ResolveFutures { metadata, .. } => metadata,
2161            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2162            HydroNode::Map { metadata, .. } => metadata,
2163            HydroNode::FlatMap { metadata, .. } => metadata,
2164            HydroNode::Filter { metadata, .. } => metadata,
2165            HydroNode::FilterMap { metadata, .. } => metadata,
2166            HydroNode::DeferTick { metadata, .. } => metadata,
2167            HydroNode::Enumerate { metadata, .. } => metadata,
2168            HydroNode::Inspect { metadata, .. } => metadata,
2169            HydroNode::Unique { metadata, .. } => metadata,
2170            HydroNode::Sort { metadata, .. } => metadata,
2171            HydroNode::Fold { metadata, .. } => metadata,
2172            HydroNode::FoldKeyed { metadata, .. } => metadata,
2173            HydroNode::Reduce { metadata, .. } => metadata,
2174            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2175            HydroNode::Network { metadata, .. } => metadata,
2176            HydroNode::Counter { metadata, .. } => metadata,
2177        }
2178    }
2179
2180    pub fn print_root(&self) -> String {
2181        match self {
2182            HydroNode::Placeholder => {
2183                panic!()
2184            }
2185            HydroNode::Source { source, .. } => format!("Source({:?})", source),
2186            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2187            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2188            HydroNode::Persist { .. } => "Persist()".to_string(),
2189            HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2190            HydroNode::Delta { .. } => "Delta()".to_string(),
2191            HydroNode::Chain { first, second, .. } => {
2192                format!("Chain({}, {})", first.print_root(), second.print_root())
2193            }
2194            HydroNode::CrossProduct { left, right, .. } => {
2195                format!(
2196                    "CrossProduct({}, {})",
2197                    left.print_root(),
2198                    right.print_root()
2199                )
2200            }
2201            HydroNode::CrossSingleton { left, right, .. } => {
2202                format!(
2203                    "CrossSingleton({}, {})",
2204                    left.print_root(),
2205                    right.print_root()
2206                )
2207            }
2208            HydroNode::Join { left, right, .. } => {
2209                format!("Join({}, {})", left.print_root(), right.print_root())
2210            }
2211            HydroNode::Difference { pos, neg, .. } => {
2212                format!("Difference({}, {})", pos.print_root(), neg.print_root())
2213            }
2214            HydroNode::AntiJoin { pos, neg, .. } => {
2215                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2216            }
2217            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2218            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2219            HydroNode::Map { f, .. } => format!("Map({:?})", f),
2220            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2221            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2222            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2223            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2224            HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2225            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2226            HydroNode::Unique { .. } => "Unique()".to_string(),
2227            HydroNode::Sort { .. } => "Sort()".to_string(),
2228            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2229            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2230            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2231            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2232            HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2233            HydroNode::Counter { tag, duration, .. } => {
2234                format!("Counter({:?}, {:?})", tag, duration)
2235            }
2236        }
2237    }
2238}
2239
2240#[cfg(feature = "build")]
2241#[expect(clippy::too_many_arguments, reason = "networking internals")]
2242fn instantiate_network<'a, D>(
2243    from_location: &LocationId,
2244    from_key: Option<usize>,
2245    to_location: &LocationId,
2246    to_key: Option<usize>,
2247    nodes: &HashMap<usize, D::Process>,
2248    clusters: &HashMap<usize, D::Cluster>,
2249    externals: &HashMap<usize, D::ExternalProcess>,
2250    compile_env: &D::CompileEnv,
2251) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
2252where
2253    D: Deploy<'a>,
2254{
2255    let ((sink, source), connect_fn) = match (from_location, to_location) {
2256        (LocationId::Process(from), LocationId::Process(to)) => {
2257            let from_node = nodes
2258                .get(from)
2259                .unwrap_or_else(|| {
2260                    panic!("A process used in the graph was not instantiated: {}", from)
2261                })
2262                .clone();
2263            let to_node = nodes
2264                .get(to)
2265                .unwrap_or_else(|| {
2266                    panic!("A process used in the graph was not instantiated: {}", to)
2267                })
2268                .clone();
2269
2270            let sink_port = D::allocate_process_port(&from_node);
2271            let source_port = D::allocate_process_port(&to_node);
2272
2273            (
2274                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2275                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2276            )
2277        }
2278        (LocationId::Process(from), LocationId::Cluster(to)) => {
2279            let from_node = nodes
2280                .get(from)
2281                .unwrap_or_else(|| {
2282                    panic!("A process used in the graph was not instantiated: {}", from)
2283                })
2284                .clone();
2285            let to_node = clusters
2286                .get(to)
2287                .unwrap_or_else(|| {
2288                    panic!("A cluster used in the graph was not instantiated: {}", to)
2289                })
2290                .clone();
2291
2292            let sink_port = D::allocate_process_port(&from_node);
2293            let source_port = D::allocate_cluster_port(&to_node);
2294
2295            (
2296                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2297                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2298            )
2299        }
2300        (LocationId::Cluster(from), LocationId::Process(to)) => {
2301            let from_node = clusters
2302                .get(from)
2303                .unwrap_or_else(|| {
2304                    panic!("A cluster used in the graph was not instantiated: {}", from)
2305                })
2306                .clone();
2307            let to_node = nodes
2308                .get(to)
2309                .unwrap_or_else(|| {
2310                    panic!("A process used in the graph was not instantiated: {}", to)
2311                })
2312                .clone();
2313
2314            let sink_port = D::allocate_cluster_port(&from_node);
2315            let source_port = D::allocate_process_port(&to_node);
2316
2317            (
2318                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2319                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2320            )
2321        }
2322        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2323            let from_node = clusters
2324                .get(from)
2325                .unwrap_or_else(|| {
2326                    panic!("A cluster used in the graph was not instantiated: {}", from)
2327                })
2328                .clone();
2329            let to_node = clusters
2330                .get(to)
2331                .unwrap_or_else(|| {
2332                    panic!("A cluster used in the graph was not instantiated: {}", to)
2333                })
2334                .clone();
2335
2336            let sink_port = D::allocate_cluster_port(&from_node);
2337            let source_port = D::allocate_cluster_port(&to_node);
2338
2339            (
2340                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2341                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2342            )
2343        }
2344        (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2345            let from_node = externals
2346                .get(from)
2347                .unwrap_or_else(|| {
2348                    panic!(
2349                        "A external used in the graph was not instantiated: {}",
2350                        from
2351                    )
2352                })
2353                .clone();
2354
2355            let to_node = nodes
2356                .get(to)
2357                .unwrap_or_else(|| {
2358                    panic!("A process used in the graph was not instantiated: {}", to)
2359                })
2360                .clone();
2361
2362            let sink_port = D::allocate_external_port(&from_node);
2363            let source_port = D::allocate_process_port(&to_node);
2364
2365            from_node.register(from_key.unwrap(), sink_port.clone());
2366
2367            (
2368                (
2369                    parse_quote!(DUMMY),
2370                    D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2371                ),
2372                D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2373            )
2374        }
2375        (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2376            todo!("NYI")
2377        }
2378        (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2379            panic!("Cannot send from external to external")
2380        }
2381        (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2382            let from_node = nodes
2383                .get(from)
2384                .unwrap_or_else(|| {
2385                    panic!("A process used in the graph was not instantiated: {}", from)
2386                })
2387                .clone();
2388
2389            let to_node = externals
2390                .get(to)
2391                .unwrap_or_else(|| {
2392                    panic!("A external used in the graph was not instantiated: {}", to)
2393                })
2394                .clone();
2395
2396            let sink_port = D::allocate_process_port(&from_node);
2397            let source_port = D::allocate_external_port(&to_node);
2398
2399            to_node.register(to_key.unwrap(), source_port.clone());
2400
2401            (
2402                (
2403                    D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2404                    parse_quote!(DUMMY),
2405                ),
2406                D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2407            )
2408        }
2409        (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2410            todo!("NYI")
2411        }
2412        (LocationId::Tick(_, _), _) => panic!(),
2413        (_, LocationId::Tick(_, _)) => panic!(),
2414    };
2415    (sink, source, connect_fn)
2416}
2417
2418#[cfg(test)]
2419mod test {
2420    use std::mem::size_of;
2421
2422    use super::*;
2423
2424    #[test]
2425    fn hydro_node_size() {
2426        insta::assert_snapshot!(size_of::<HydroNode>(), @"152");
2427    }
2428
2429    #[test]
2430    fn hydro_leaf_size() {
2431        insta::assert_snapshot!(size_of::<HydroLeaf>(), @"128");
2432    }
2433}