hydro_lang/
ir.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26#[derive(Clone, Hash)]
27pub struct DebugExpr(pub syn::Expr);
28
29impl From<syn::Expr> for DebugExpr {
30    fn from(expr: syn::Expr) -> DebugExpr {
31        DebugExpr(expr)
32    }
33}
34
35impl Deref for DebugExpr {
36    type Target = syn::Expr;
37
38    fn deref(&self) -> &Self::Target {
39        &self.0
40    }
41}
42
43impl ToTokens for DebugExpr {
44    fn to_tokens(&self, tokens: &mut TokenStream) {
45        self.0.to_tokens(tokens);
46    }
47}
48
49impl Debug for DebugExpr {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "{}", self.0.to_token_stream())
52    }
53}
54
55#[derive(Clone, Hash)]
56pub struct DebugType(pub syn::Type);
57
58impl From<syn::Type> for DebugType {
59    fn from(t: syn::Type) -> DebugType {
60        DebugType(t)
61    }
62}
63
64impl Deref for DebugType {
65    type Target = syn::Type;
66
67    fn deref(&self) -> &Self::Target {
68        &self.0
69    }
70}
71
72impl ToTokens for DebugType {
73    fn to_tokens(&self, tokens: &mut TokenStream) {
74        self.0.to_tokens(tokens);
75    }
76}
77
78impl Debug for DebugType {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "{}", self.0.to_token_stream())
81    }
82}
83
84#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
85#[allow(
86    clippy::large_enum_variant,
87    reason = "`Building` is just equivalent to `None`."
88)]
89pub enum DebugInstantiate {
90    Building,
91    Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
92}
93
94impl Debug for DebugInstantiate {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        write!(f, "<network instantiate>")
97    }
98}
99
100impl Hash for DebugInstantiate {
101    fn hash<H: Hasher>(&self, _state: &mut H) {
102        // Do nothing
103    }
104}
105
106impl Clone for DebugInstantiate {
107    fn clone(&self) -> Self {
108        match self {
109            DebugInstantiate::Building => DebugInstantiate::Building,
110            DebugInstantiate::Finalized(_, _, _) => {
111                panic!("DebugInstantiate::Finalized should not be cloned")
112            }
113        }
114    }
115}
116
117/// A source in a Hydro graph, where data enters the graph.
118#[derive(Debug, Hash, Clone)]
119pub enum HydroSource {
120    Stream(DebugExpr),
121    ExternalNetwork(),
122    Iter(DebugExpr),
123    Spin(),
124}
125
126#[cfg(feature = "build")]
127pub enum BuildersOrCallback<
128    'a,
129    L: FnMut(&mut HydroLeaf, &mut usize),
130    N: FnMut(&mut HydroNode, &mut usize),
131> {
132    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
133    Callback(L, N),
134}
135
136/// An leaf in a Hydro graph, which is an pipeline that doesn't emit
137/// any downstream values. Traversals over the dataflow graph and
138/// generating DFIR IR start from leaves.
139#[derive(Debug, Hash)]
140pub enum HydroLeaf {
141    ForEach {
142        f: DebugExpr,
143        input: Box<HydroNode>,
144        metadata: HydroIrMetadata,
145    },
146    DestSink {
147        sink: DebugExpr,
148        input: Box<HydroNode>,
149        metadata: HydroIrMetadata,
150    },
151    CycleSink {
152        ident: syn::Ident,
153        location_kind: LocationId,
154        input: Box<HydroNode>,
155        metadata: HydroIrMetadata,
156    },
157}
158
159impl HydroLeaf {
160    #[cfg(feature = "build")]
161    pub fn compile_network<'a, D: Deploy<'a>>(
162        &mut self,
163        compile_env: &D::CompileEnv,
164        seen_tees: &mut SeenTees,
165        seen_tee_locations: &mut SeenTeeLocations,
166        processes: &HashMap<usize, D::Process>,
167        clusters: &HashMap<usize, D::Cluster>,
168        externals: &HashMap<usize, D::ExternalProcess>,
169    ) {
170        self.transform_children(
171            |n, s| {
172                n.compile_network::<D>(
173                    compile_env,
174                    s,
175                    seen_tee_locations,
176                    processes,
177                    clusters,
178                    externals,
179                );
180            },
181            seen_tees,
182        )
183    }
184
185    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
186        self.transform_children(
187            |n, s| {
188                n.connect_network(s);
189            },
190            seen_tees,
191        )
192    }
193
194    pub fn transform_bottom_up(
195        &mut self,
196        transform_leaf: &mut impl FnMut(&mut HydroLeaf),
197        transform_node: &mut impl FnMut(&mut HydroNode),
198        seen_tees: &mut SeenTees,
199    ) {
200        self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
201
202        transform_leaf(self);
203    }
204
205    pub fn transform_children(
206        &mut self,
207        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
208        seen_tees: &mut SeenTees,
209    ) {
210        match self {
211            HydroLeaf::ForEach { f: _, input, .. }
212            | HydroLeaf::DestSink { sink: _, input, .. }
213            | HydroLeaf::CycleSink {
214                ident: _,
215                location_kind: _,
216                input,
217                ..
218            } => {
219                transform(input, seen_tees);
220            }
221        }
222    }
223
224    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
225        match self {
226            HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
227                f: f.clone(),
228                input: Box::new(input.deep_clone(seen_tees)),
229                metadata: metadata.clone(),
230            },
231            HydroLeaf::DestSink {
232                sink,
233                input,
234                metadata,
235            } => HydroLeaf::DestSink {
236                sink: sink.clone(),
237                input: Box::new(input.deep_clone(seen_tees)),
238                metadata: metadata.clone(),
239            },
240            HydroLeaf::CycleSink {
241                ident,
242                location_kind,
243                input,
244                metadata,
245            } => HydroLeaf::CycleSink {
246                ident: ident.clone(),
247                location_kind: location_kind.clone(),
248                input: Box::new(input.deep_clone(seen_tees)),
249                metadata: metadata.clone(),
250            },
251        }
252    }
253
254    #[cfg(feature = "build")]
255    pub fn emit(
256        &mut self,
257        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
258        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
259        next_stmt_id: &mut usize,
260    ) {
261        self.emit_core(
262            &mut BuildersOrCallback::Builders::<
263                fn(&mut HydroLeaf, &mut usize),
264                fn(&mut HydroNode, &mut usize),
265            >(graph_builders),
266            built_tees,
267            next_stmt_id,
268        );
269    }
270
271    #[cfg(feature = "build")]
272    pub fn emit_core(
273        &mut self,
274        builders_or_callback: &mut BuildersOrCallback<
275            impl FnMut(&mut HydroLeaf, &mut usize),
276            impl FnMut(&mut HydroNode, &mut usize),
277        >,
278        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
279        next_stmt_id: &mut usize,
280    ) {
281        match self {
282            HydroLeaf::ForEach { f, input, .. } => {
283                let (input_ident, input_location_id) =
284                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
285
286                match builders_or_callback {
287                    BuildersOrCallback::Builders(graph_builders) => {
288                        graph_builders
289                            .entry(input_location_id)
290                            .or_default()
291                            .add_dfir(
292                                parse_quote! {
293                                    #input_ident -> for_each(#f);
294                                },
295                                None,
296                                Some(&next_stmt_id.to_string()),
297                            );
298                    }
299                    BuildersOrCallback::Callback(leaf_callback, _) => {
300                        leaf_callback(self, next_stmt_id);
301                    }
302                }
303
304                *next_stmt_id += 1;
305            }
306
307            HydroLeaf::DestSink { sink, input, .. } => {
308                let (input_ident, input_location_id) =
309                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
310
311                match builders_or_callback {
312                    BuildersOrCallback::Builders(graph_builders) => {
313                        graph_builders
314                            .entry(input_location_id)
315                            .or_default()
316                            .add_dfir(
317                                parse_quote! {
318                                    #input_ident -> dest_sink(#sink);
319                                },
320                                None,
321                                Some(&next_stmt_id.to_string()),
322                            );
323                    }
324                    BuildersOrCallback::Callback(leaf_callback, _) => {
325                        leaf_callback(self, next_stmt_id);
326                    }
327                }
328
329                *next_stmt_id += 1;
330            }
331
332            HydroLeaf::CycleSink {
333                ident,
334                location_kind,
335                input,
336                ..
337            } => {
338                let (input_ident, input_location_id) =
339                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
340
341                let location_id = match location_kind.root() {
342                    LocationId::Process(id) => id,
343                    LocationId::Cluster(id) => id,
344                    LocationId::Tick(_, _) => panic!(),
345                    LocationId::ExternalProcess(_) => panic!(),
346                };
347
348                assert_eq!(
349                    input_location_id, *location_id,
350                    "cycle_sink location mismatch"
351                );
352
353                match builders_or_callback {
354                    BuildersOrCallback::Builders(graph_builders) => {
355                        graph_builders.entry(*location_id).or_default().add_dfir(
356                            parse_quote! {
357                                #ident = #input_ident;
358                            },
359                            None,
360                            None,
361                        );
362                    }
363                    // No ID, no callback
364                    BuildersOrCallback::Callback(_, _) => {}
365                }
366            }
367        }
368    }
369
370    pub fn metadata(&self) -> &HydroIrMetadata {
371        match self {
372            HydroLeaf::ForEach { metadata, .. }
373            | HydroLeaf::DestSink { metadata, .. }
374            | HydroLeaf::CycleSink { metadata, .. } => metadata,
375        }
376    }
377
378    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
379        match self {
380            HydroLeaf::ForEach { metadata, .. }
381            | HydroLeaf::DestSink { metadata, .. }
382            | HydroLeaf::CycleSink { metadata, .. } => metadata,
383        }
384    }
385
386    pub fn print_root(&self) -> String {
387        match self {
388            HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
389            HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
390            HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
391        }
392    }
393}
394
395#[cfg(feature = "build")]
396pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
397    let mut builders = BTreeMap::new();
398    let mut built_tees = HashMap::new();
399    let mut next_stmt_id = 0;
400    for leaf in ir {
401        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
402    }
403    builders
404}
405
406#[cfg(feature = "build")]
407pub fn traverse_dfir(
408    ir: &mut [HydroLeaf],
409    transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
410    transform_node: impl FnMut(&mut HydroNode, &mut usize),
411) {
412    let mut seen_tees = HashMap::new();
413    let mut next_stmt_id = 0;
414    let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
415    ir.iter_mut().for_each(|leaf| {
416        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
417    });
418}
419
420pub fn transform_bottom_up(
421    ir: &mut [HydroLeaf],
422    transform_leaf: &mut impl FnMut(&mut HydroLeaf),
423    transform_node: &mut impl FnMut(&mut HydroNode),
424) {
425    let mut seen_tees = HashMap::new();
426    ir.iter_mut().for_each(|leaf| {
427        leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
428    });
429}
430
431pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
432    let mut seen_tees = HashMap::new();
433    ir.iter()
434        .map(|leaf| leaf.deep_clone(&mut seen_tees))
435        .collect()
436}
437
438type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
439thread_local! {
440    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
441}
442
443pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
444    PRINTED_TEES.with(|printed_tees| {
445        let mut printed_tees_mut = printed_tees.borrow_mut();
446        *printed_tees_mut = Some((0, HashMap::new()));
447        drop(printed_tees_mut);
448
449        let ret = f();
450
451        let mut printed_tees_mut = printed_tees.borrow_mut();
452        *printed_tees_mut = None;
453
454        ret
455    })
456}
457
458pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
459
460impl Debug for TeeNode {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        PRINTED_TEES.with(|printed_tees| {
463            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
464            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
465
466            if let Some(printed_tees_mut) = printed_tees_mut {
467                if let Some(existing) = printed_tees_mut
468                    .1
469                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
470                {
471                    write!(f, "<tee {}>", existing)
472                } else {
473                    let next_id = printed_tees_mut.0;
474                    printed_tees_mut.0 += 1;
475                    printed_tees_mut
476                        .1
477                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
478                    drop(printed_tees_mut_borrow);
479                    write!(f, "<tee {}>: ", next_id)?;
480                    Debug::fmt(&self.0.borrow(), f)
481                }
482            } else {
483                drop(printed_tees_mut_borrow);
484                write!(f, "<tee>: ")?;
485                Debug::fmt(&self.0.borrow(), f)
486            }
487        })
488    }
489}
490
491impl Hash for TeeNode {
492    fn hash<H: Hasher>(&self, state: &mut H) {
493        self.0.borrow_mut().hash(state);
494    }
495}
496
497#[derive(Debug, Clone)]
498pub struct HydroIrMetadata {
499    pub location_kind: LocationId,
500    pub output_type: Option<DebugType>,
501    pub cardinality: Option<usize>,
502    pub cpu_usage: Option<f64>,
503}
504
505// HydroIrMetadata shouldn't be used to hash or compare
506impl Hash for HydroIrMetadata {
507    fn hash<H: Hasher>(&self, _: &mut H) {}
508}
509
510impl PartialEq for HydroIrMetadata {
511    fn eq(&self, _: &Self) -> bool {
512        true
513    }
514}
515
516impl Eq for HydroIrMetadata {}
517
518/// An intermediate node in a Hydro graph, which consumes data
519/// from upstream nodes and emits data to downstream nodes.
520#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
521#[allow(clippy::large_enum_variant, reason = "TODO(mingwei):")]
522#[derive(Debug, Hash)]
523pub enum HydroNode {
524    Placeholder,
525
526    Source {
527        source: HydroSource,
528        location_kind: LocationId,
529        metadata: HydroIrMetadata,
530    },
531
532    CycleSource {
533        ident: syn::Ident,
534        location_kind: LocationId,
535        metadata: HydroIrMetadata,
536    },
537
538    Tee {
539        inner: TeeNode,
540        metadata: HydroIrMetadata,
541    },
542
543    Persist {
544        inner: Box<HydroNode>,
545        metadata: HydroIrMetadata,
546    },
547
548    Unpersist {
549        inner: Box<HydroNode>,
550        metadata: HydroIrMetadata,
551    },
552
553    Delta {
554        inner: Box<HydroNode>,
555        metadata: HydroIrMetadata,
556    },
557
558    Chain {
559        first: Box<HydroNode>,
560        second: Box<HydroNode>,
561        metadata: HydroIrMetadata,
562    },
563
564    CrossProduct {
565        left: Box<HydroNode>,
566        right: Box<HydroNode>,
567        metadata: HydroIrMetadata,
568    },
569
570    CrossSingleton {
571        left: Box<HydroNode>,
572        right: Box<HydroNode>,
573        metadata: HydroIrMetadata,
574    },
575
576    Join {
577        left: Box<HydroNode>,
578        right: Box<HydroNode>,
579        metadata: HydroIrMetadata,
580    },
581
582    Difference {
583        pos: Box<HydroNode>,
584        neg: Box<HydroNode>,
585        metadata: HydroIrMetadata,
586    },
587
588    AntiJoin {
589        pos: Box<HydroNode>,
590        neg: Box<HydroNode>,
591        metadata: HydroIrMetadata,
592    },
593
594    Map {
595        f: DebugExpr,
596        input: Box<HydroNode>,
597        metadata: HydroIrMetadata,
598    },
599    FlatMap {
600        f: DebugExpr,
601        input: Box<HydroNode>,
602        metadata: HydroIrMetadata,
603    },
604    Filter {
605        f: DebugExpr,
606        input: Box<HydroNode>,
607        metadata: HydroIrMetadata,
608    },
609    FilterMap {
610        f: DebugExpr,
611        input: Box<HydroNode>,
612        metadata: HydroIrMetadata,
613    },
614
615    DeferTick {
616        input: Box<HydroNode>,
617        metadata: HydroIrMetadata,
618    },
619    Enumerate {
620        is_static: bool,
621        input: Box<HydroNode>,
622        metadata: HydroIrMetadata,
623    },
624    Inspect {
625        f: DebugExpr,
626        input: Box<HydroNode>,
627        metadata: HydroIrMetadata,
628    },
629
630    Unique {
631        input: Box<HydroNode>,
632        metadata: HydroIrMetadata,
633    },
634
635    Sort {
636        input: Box<HydroNode>,
637        metadata: HydroIrMetadata,
638    },
639    Fold {
640        init: DebugExpr,
641        acc: DebugExpr,
642        input: Box<HydroNode>,
643        metadata: HydroIrMetadata,
644    },
645    FoldKeyed {
646        init: DebugExpr,
647        acc: DebugExpr,
648        input: Box<HydroNode>,
649        metadata: HydroIrMetadata,
650    },
651
652    Reduce {
653        f: DebugExpr,
654        input: Box<HydroNode>,
655        metadata: HydroIrMetadata,
656    },
657    ReduceKeyed {
658        f: DebugExpr,
659        input: Box<HydroNode>,
660        metadata: HydroIrMetadata,
661    },
662
663    Network {
664        from_key: Option<usize>,
665        to_location: LocationId,
666        to_key: Option<usize>,
667        serialize_fn: Option<DebugExpr>,
668        instantiate_fn: DebugInstantiate,
669        deserialize_fn: Option<DebugExpr>,
670        input: Box<HydroNode>,
671        metadata: HydroIrMetadata,
672    },
673
674    Counter {
675        tag: String,
676        duration: DebugExpr,
677        input: Box<HydroNode>,
678        metadata: HydroIrMetadata,
679    },
680}
681
682pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
683pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
684
685impl<'a> HydroNode {
686    #[cfg(feature = "build")]
687    pub fn compile_network<D: Deploy<'a>>(
688        &mut self,
689        compile_env: &D::CompileEnv,
690        seen_tees: &mut SeenTees,
691        seen_tee_locations: &mut SeenTeeLocations,
692        nodes: &HashMap<usize, D::Process>,
693        clusters: &HashMap<usize, D::Cluster>,
694        externals: &HashMap<usize, D::ExternalProcess>,
695    ) {
696        let mut curr_location = None;
697
698        self.transform_bottom_up(
699            &mut |n| {
700                if let HydroNode::Network {
701                    from_key,
702                    to_location,
703                    to_key,
704                    instantiate_fn,
705                    ..
706                } = n
707                {
708                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
709                        DebugInstantiate::Building => instantiate_network::<D>(
710                            curr_location.as_ref().unwrap(),
711                            *from_key,
712                            to_location,
713                            *to_key,
714                            nodes,
715                            clusters,
716                            externals,
717                            compile_env,
718                        ),
719
720                        DebugInstantiate::Finalized(_, _, _) => panic!("network already finalized"),
721                    };
722
723                    *instantiate_fn =
724                        DebugInstantiate::Finalized(sink_expr, source_expr, Some(connect_fn));
725                }
726
727                // Calculate location of current node to use as from_location
728                match n {
729                    HydroNode::Network {
730                        to_location: location_kind,
731                        ..
732                    }
733                    | HydroNode::CycleSource { location_kind, .. }
734                    | HydroNode::Source { location_kind, .. } => {
735                        // Unwrap location out of Tick
736                        if let LocationId::Tick(_, tick_loc) = location_kind {
737                            curr_location = Some(*tick_loc.clone());
738                        } else {
739                            curr_location = Some(location_kind.clone());
740                        }
741                    }
742                    HydroNode::Tee { inner, .. } => {
743                        let inner_ref = inner.0.as_ref() as *const RefCell<HydroNode>;
744                        if let Some(tee_location) = seen_tee_locations.get(&inner_ref) {
745                            curr_location = Some(tee_location.clone());
746                        } else {
747                            seen_tee_locations
748                                .insert(inner_ref, curr_location.as_ref().unwrap().clone());
749                        }
750                    }
751                    _ => {}
752                }
753            },
754            seen_tees,
755        );
756    }
757
758    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
759        self.transform_bottom_up(
760            &mut |n| {
761                if let HydroNode::Network { instantiate_fn, .. } = n {
762                    match instantiate_fn {
763                        DebugInstantiate::Building => panic!("network not built"),
764
765                        DebugInstantiate::Finalized(_, _, connect_fn) => {
766                            (connect_fn.take().unwrap())();
767                        }
768                    }
769                }
770            },
771            seen_tees,
772        );
773    }
774
775    pub fn transform_bottom_up(
776        &mut self,
777        transform: &mut impl FnMut(&mut HydroNode),
778        seen_tees: &mut SeenTees,
779    ) {
780        self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
781
782        transform(self);
783    }
784
785    #[inline(always)]
786    pub fn transform_children(
787        &mut self,
788        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
789        seen_tees: &mut SeenTees,
790    ) {
791        match self {
792            HydroNode::Placeholder => {
793                panic!();
794            }
795
796            HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
797
798            HydroNode::Tee { inner, .. } => {
799                if let Some(transformed) =
800                    seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
801                {
802                    *inner = TeeNode(transformed.clone());
803                } else {
804                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
805                    seen_tees.insert(
806                        inner.0.as_ref() as *const RefCell<HydroNode>,
807                        transformed_cell.clone(),
808                    );
809                    let mut orig = inner.0.replace(HydroNode::Placeholder);
810                    transform(&mut orig, seen_tees);
811                    *transformed_cell.borrow_mut() = orig;
812                    *inner = TeeNode(transformed_cell);
813                }
814            }
815
816            HydroNode::Persist { inner, .. }
817            | HydroNode::Unpersist { inner, .. }
818            | HydroNode::Delta { inner, .. } => {
819                transform(inner.as_mut(), seen_tees);
820            }
821
822            HydroNode::Chain { first, second, .. } => {
823                transform(first.as_mut(), seen_tees);
824                transform(second.as_mut(), seen_tees);
825            }
826
827            HydroNode::CrossSingleton { left, right, .. }
828            | HydroNode::CrossProduct { left, right, .. }
829            | HydroNode::Join { left, right, .. } => {
830                transform(left.as_mut(), seen_tees);
831                transform(right.as_mut(), seen_tees);
832            }
833
834            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
835                transform(pos.as_mut(), seen_tees);
836                transform(neg.as_mut(), seen_tees);
837            }
838
839            HydroNode::Map { input, .. }
840            | HydroNode::FlatMap { input, .. }
841            | HydroNode::Filter { input, .. }
842            | HydroNode::FilterMap { input, .. }
843            | HydroNode::Sort { input, .. }
844            | HydroNode::DeferTick { input, .. }
845            | HydroNode::Enumerate { input, .. }
846            | HydroNode::Inspect { input, .. }
847            | HydroNode::Unique { input, .. }
848            | HydroNode::Network { input, .. }
849            | HydroNode::Fold { input, .. }
850            | HydroNode::FoldKeyed { input, .. }
851            | HydroNode::Reduce { input, .. }
852            | HydroNode::ReduceKeyed { input, .. }
853            | HydroNode::Counter { input, .. } => {
854                transform(input.as_mut(), seen_tees);
855            }
856        }
857    }
858
859    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
860        match self {
861            HydroNode::Placeholder => HydroNode::Placeholder,
862            HydroNode::Source {
863                source,
864                location_kind,
865                metadata,
866            } => HydroNode::Source {
867                source: source.clone(),
868                location_kind: location_kind.clone(),
869                metadata: metadata.clone(),
870            },
871            HydroNode::CycleSource {
872                ident,
873                location_kind,
874                metadata,
875            } => HydroNode::CycleSource {
876                ident: ident.clone(),
877                location_kind: location_kind.clone(),
878                metadata: metadata.clone(),
879            },
880            HydroNode::Tee { inner, metadata } => {
881                if let Some(transformed) =
882                    seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
883                {
884                    HydroNode::Tee {
885                        inner: TeeNode(transformed.clone()),
886                        metadata: metadata.clone(),
887                    }
888                } else {
889                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
890                    seen_tees.insert(
891                        inner.0.as_ref() as *const RefCell<HydroNode>,
892                        new_rc.clone(),
893                    );
894                    let cloned = inner.0.borrow().deep_clone(seen_tees);
895                    *new_rc.borrow_mut() = cloned;
896                    HydroNode::Tee {
897                        inner: TeeNode(new_rc),
898                        metadata: metadata.clone(),
899                    }
900                }
901            }
902            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
903                inner: Box::new(inner.deep_clone(seen_tees)),
904                metadata: metadata.clone(),
905            },
906            HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
907                inner: Box::new(inner.deep_clone(seen_tees)),
908                metadata: metadata.clone(),
909            },
910            HydroNode::Delta { inner, metadata } => HydroNode::Delta {
911                inner: Box::new(inner.deep_clone(seen_tees)),
912                metadata: metadata.clone(),
913            },
914            HydroNode::Chain {
915                first,
916                second,
917                metadata,
918            } => HydroNode::Chain {
919                first: Box::new(first.deep_clone(seen_tees)),
920                second: Box::new(second.deep_clone(seen_tees)),
921                metadata: metadata.clone(),
922            },
923            HydroNode::CrossProduct {
924                left,
925                right,
926                metadata,
927            } => HydroNode::CrossProduct {
928                left: Box::new(left.deep_clone(seen_tees)),
929                right: Box::new(right.deep_clone(seen_tees)),
930                metadata: metadata.clone(),
931            },
932            HydroNode::CrossSingleton {
933                left,
934                right,
935                metadata,
936            } => HydroNode::CrossSingleton {
937                left: Box::new(left.deep_clone(seen_tees)),
938                right: Box::new(right.deep_clone(seen_tees)),
939                metadata: metadata.clone(),
940            },
941            HydroNode::Join {
942                left,
943                right,
944                metadata,
945            } => HydroNode::Join {
946                left: Box::new(left.deep_clone(seen_tees)),
947                right: Box::new(right.deep_clone(seen_tees)),
948                metadata: metadata.clone(),
949            },
950            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
951                pos: Box::new(pos.deep_clone(seen_tees)),
952                neg: Box::new(neg.deep_clone(seen_tees)),
953                metadata: metadata.clone(),
954            },
955            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
956                pos: Box::new(pos.deep_clone(seen_tees)),
957                neg: Box::new(neg.deep_clone(seen_tees)),
958                metadata: metadata.clone(),
959            },
960            HydroNode::Map { f, input, metadata } => HydroNode::Map {
961                f: f.clone(),
962                input: Box::new(input.deep_clone(seen_tees)),
963                metadata: metadata.clone(),
964            },
965            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
966                f: f.clone(),
967                input: Box::new(input.deep_clone(seen_tees)),
968                metadata: metadata.clone(),
969            },
970            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
971                f: f.clone(),
972                input: Box::new(input.deep_clone(seen_tees)),
973                metadata: metadata.clone(),
974            },
975            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
976                f: f.clone(),
977                input: Box::new(input.deep_clone(seen_tees)),
978                metadata: metadata.clone(),
979            },
980            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
981                input: Box::new(input.deep_clone(seen_tees)),
982                metadata: metadata.clone(),
983            },
984            HydroNode::Enumerate {
985                is_static,
986                input,
987                metadata,
988            } => HydroNode::Enumerate {
989                is_static: *is_static,
990                input: Box::new(input.deep_clone(seen_tees)),
991                metadata: metadata.clone(),
992            },
993            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
994                f: f.clone(),
995                input: Box::new(input.deep_clone(seen_tees)),
996                metadata: metadata.clone(),
997            },
998            HydroNode::Unique { input, metadata } => HydroNode::Unique {
999                input: Box::new(input.deep_clone(seen_tees)),
1000                metadata: metadata.clone(),
1001            },
1002            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1003                input: Box::new(input.deep_clone(seen_tees)),
1004                metadata: metadata.clone(),
1005            },
1006            HydroNode::Fold {
1007                init,
1008                acc,
1009                input,
1010                metadata,
1011            } => HydroNode::Fold {
1012                init: init.clone(),
1013                acc: acc.clone(),
1014                input: Box::new(input.deep_clone(seen_tees)),
1015                metadata: metadata.clone(),
1016            },
1017            HydroNode::FoldKeyed {
1018                init,
1019                acc,
1020                input,
1021                metadata,
1022            } => HydroNode::FoldKeyed {
1023                init: init.clone(),
1024                acc: acc.clone(),
1025                input: Box::new(input.deep_clone(seen_tees)),
1026                metadata: metadata.clone(),
1027            },
1028            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1029                f: f.clone(),
1030                input: Box::new(input.deep_clone(seen_tees)),
1031                metadata: metadata.clone(),
1032            },
1033            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1034                f: f.clone(),
1035                input: Box::new(input.deep_clone(seen_tees)),
1036                metadata: metadata.clone(),
1037            },
1038            HydroNode::Network {
1039                from_key,
1040                to_location,
1041                to_key,
1042                serialize_fn,
1043                instantiate_fn,
1044                deserialize_fn,
1045                input,
1046                metadata,
1047            } => HydroNode::Network {
1048                from_key: *from_key,
1049                to_location: to_location.clone(),
1050                to_key: *to_key,
1051                serialize_fn: serialize_fn.clone(),
1052                instantiate_fn: instantiate_fn.clone(),
1053                deserialize_fn: deserialize_fn.clone(),
1054                input: Box::new(input.deep_clone(seen_tees)),
1055                metadata: metadata.clone(),
1056            },
1057            HydroNode::Counter {
1058                tag,
1059                duration,
1060                input,
1061                metadata,
1062            } => HydroNode::Counter {
1063                tag: tag.clone(),
1064                duration: duration.clone(),
1065                input: Box::new(input.deep_clone(seen_tees)),
1066                metadata: metadata.clone(),
1067            },
1068        }
1069    }
1070
1071    #[cfg(feature = "build")]
1072    pub fn emit_core(
1073        &mut self,
1074        builders_or_callback: &mut BuildersOrCallback<
1075            impl FnMut(&mut HydroLeaf, &mut usize),
1076            impl FnMut(&mut HydroNode, &mut usize),
1077        >,
1078        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1079        next_stmt_id: &mut usize,
1080    ) -> (syn::Ident, usize) {
1081        match self {
1082            HydroNode::Placeholder => {
1083                panic!()
1084            }
1085
1086            HydroNode::Persist { inner, .. } => {
1087                let (inner_ident, location) =
1088                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1089
1090                let persist_ident =
1091                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1092
1093                match builders_or_callback {
1094                    BuildersOrCallback::Builders(graph_builders) => {
1095                        let builder = graph_builders.entry(location).or_default();
1096                        builder.add_dfir(
1097                            parse_quote! {
1098                                #persist_ident = #inner_ident -> persist::<'static>();
1099                            },
1100                            None,
1101                            Some(&next_stmt_id.to_string()),
1102                        );
1103                    }
1104                    BuildersOrCallback::Callback(_, node_callback) => {
1105                        node_callback(self, next_stmt_id);
1106                    }
1107                }
1108
1109                *next_stmt_id += 1;
1110
1111                (persist_ident, location)
1112            }
1113
1114            HydroNode::Unpersist { .. } => {
1115                panic!(
1116                    "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1117                )
1118            }
1119
1120            HydroNode::Delta { inner, .. } => {
1121                let (inner_ident, location) =
1122                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1123
1124                let delta_ident =
1125                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1126
1127                match builders_or_callback {
1128                    BuildersOrCallback::Builders(graph_builders) => {
1129                        let builder = graph_builders.entry(location).or_default();
1130                        builder.add_dfir(
1131                            parse_quote! {
1132                                #delta_ident = #inner_ident -> multiset_delta();
1133                            },
1134                            None,
1135                            Some(&next_stmt_id.to_string()),
1136                        );
1137                    }
1138                    BuildersOrCallback::Callback(_, node_callback) => {
1139                        node_callback(self, next_stmt_id);
1140                    }
1141                }
1142
1143                *next_stmt_id += 1;
1144
1145                (delta_ident, location)
1146            }
1147
1148            HydroNode::Source {
1149                source,
1150                location_kind,
1151                ..
1152            } => {
1153                let location_id = match location_kind.clone() {
1154                    LocationId::Process(id) => id,
1155                    LocationId::Cluster(id) => id,
1156                    LocationId::Tick(_, _) => panic!(),
1157                    LocationId::ExternalProcess(id) => id,
1158                };
1159
1160                if let HydroSource::ExternalNetwork() = source {
1161                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1162                } else {
1163                    let source_ident =
1164                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1165
1166                    let source_stmt = match source {
1167                        HydroSource::Stream(expr) => {
1168                            parse_quote! {
1169                                #source_ident = source_stream(#expr);
1170                            }
1171                        }
1172
1173                        HydroSource::ExternalNetwork() => {
1174                            unreachable!()
1175                        }
1176
1177                        HydroSource::Iter(expr) => {
1178                            parse_quote! {
1179                                #source_ident = source_iter(#expr);
1180                            }
1181                        }
1182
1183                        HydroSource::Spin() => {
1184                            parse_quote! {
1185                                #source_ident = spin();
1186                            }
1187                        }
1188                    };
1189
1190                    match builders_or_callback {
1191                        BuildersOrCallback::Builders(graph_builders) => {
1192                            let builder = graph_builders.entry(location_id).or_default();
1193                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1194                        }
1195                        BuildersOrCallback::Callback(_, node_callback) => {
1196                            node_callback(self, next_stmt_id);
1197                        }
1198                    }
1199
1200                    *next_stmt_id += 1;
1201
1202                    (source_ident, location_id)
1203                }
1204            }
1205
1206            HydroNode::CycleSource {
1207                ident,
1208                location_kind,
1209                ..
1210            } => {
1211                let location_id = *match location_kind.root() {
1212                    LocationId::Process(id) => id,
1213                    LocationId::Cluster(id) => id,
1214                    LocationId::Tick(_, _) => panic!(),
1215                    LocationId::ExternalProcess(_) => panic!(),
1216                };
1217
1218                let ident = ident.clone();
1219
1220                match builders_or_callback {
1221                    BuildersOrCallback::Builders(_) => {}
1222                    BuildersOrCallback::Callback(_, node_callback) => {
1223                        node_callback(self, next_stmt_id);
1224                    }
1225                }
1226
1227                // consume a stmt id even though we did not emit anything so that we can instrument this
1228                *next_stmt_id += 1;
1229
1230                (ident, location_id)
1231            }
1232
1233            HydroNode::Tee { inner, .. } => {
1234                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1235                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1236                {
1237                    match builders_or_callback {
1238                        BuildersOrCallback::Builders(_) => {}
1239                        BuildersOrCallback::Callback(_, node_callback) => {
1240                            node_callback(self, next_stmt_id);
1241                        }
1242                    }
1243
1244                    (teed_from.clone(), *inner_location_id)
1245                } else {
1246                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1247                        builders_or_callback,
1248                        built_tees,
1249                        next_stmt_id,
1250                    );
1251
1252                    let tee_ident =
1253                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1254
1255                    built_tees.insert(
1256                        inner.0.as_ref() as *const RefCell<HydroNode>,
1257                        (tee_ident.clone(), inner_location_id),
1258                    );
1259
1260                    match builders_or_callback {
1261                        BuildersOrCallback::Builders(graph_builders) => {
1262                            let builder = graph_builders.entry(inner_location_id).or_default();
1263                            builder.add_dfir(
1264                                parse_quote! {
1265                                    #tee_ident = #inner_ident -> tee();
1266                                },
1267                                None,
1268                                Some(&next_stmt_id.to_string()),
1269                            );
1270                        }
1271                        BuildersOrCallback::Callback(_, node_callback) => {
1272                            node_callback(self, next_stmt_id);
1273                        }
1274                    }
1275
1276                    (tee_ident, inner_location_id)
1277                };
1278
1279                // we consume a stmt id regardless of if we emit the tee() operator,
1280                // so that during rewrites we touch all recipients of the tee()
1281
1282                *next_stmt_id += 1;
1283                (ret_ident, inner_location_id)
1284            }
1285
1286            HydroNode::Chain { first, second, .. } => {
1287                let (first_ident, first_location_id) =
1288                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1289                let (second_ident, second_location_id) =
1290                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1291
1292                assert_eq!(
1293                    first_location_id, second_location_id,
1294                    "chain inputs must be in the same location"
1295                );
1296
1297                let chain_ident =
1298                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1299
1300                match builders_or_callback {
1301                    BuildersOrCallback::Builders(graph_builders) => {
1302                        let builder = graph_builders.entry(first_location_id).or_default();
1303                        builder.add_dfir(
1304                            parse_quote! {
1305                                #chain_ident = chain();
1306                                #first_ident -> [0]#chain_ident;
1307                                #second_ident -> [1]#chain_ident;
1308                            },
1309                            None,
1310                            Some(&next_stmt_id.to_string()),
1311                        );
1312                    }
1313                    BuildersOrCallback::Callback(_, node_callback) => {
1314                        node_callback(self, next_stmt_id);
1315                    }
1316                }
1317
1318                *next_stmt_id += 1;
1319
1320                (chain_ident, first_location_id)
1321            }
1322
1323            HydroNode::CrossSingleton { left, right, .. } => {
1324                let (left_ident, left_location_id) =
1325                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1326                let (right_ident, right_location_id) =
1327                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1328
1329                assert_eq!(
1330                    left_location_id, right_location_id,
1331                    "cross_singleton inputs must be in the same location"
1332                );
1333
1334                let cross_ident =
1335                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1336
1337                match builders_or_callback {
1338                    BuildersOrCallback::Builders(graph_builders) => {
1339                        let builder = graph_builders.entry(left_location_id).or_default();
1340                        builder.add_dfir(
1341                            parse_quote! {
1342                                #cross_ident = cross_singleton();
1343                                #left_ident -> [input]#cross_ident;
1344                                #right_ident -> [single]#cross_ident;
1345                            },
1346                            None,
1347                            Some(&next_stmt_id.to_string()),
1348                        );
1349                    }
1350                    BuildersOrCallback::Callback(_, node_callback) => {
1351                        node_callback(self, next_stmt_id);
1352                    }
1353                }
1354
1355                *next_stmt_id += 1;
1356
1357                (cross_ident, left_location_id)
1358            }
1359
1360            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1361                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1362                    parse_quote!(cross_join_multiset)
1363                } else {
1364                    parse_quote!(join_multiset)
1365                };
1366
1367                let (HydroNode::CrossProduct { left, right, .. }
1368                | HydroNode::Join { left, right, .. }) = self
1369                else {
1370                    unreachable!()
1371                };
1372
1373                let (left_inner, left_lifetime) =
1374                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1375                        (left, quote!('static))
1376                    } else {
1377                        (left, quote!('tick))
1378                    };
1379
1380                let (right_inner, right_lifetime) =
1381                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1382                        (right, quote!('static))
1383                    } else {
1384                        (right, quote!('tick))
1385                    };
1386
1387                let (left_ident, left_location_id) =
1388                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1389                let (right_ident, right_location_id) =
1390                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1391
1392                assert_eq!(
1393                    left_location_id, right_location_id,
1394                    "join / cross product inputs must be in the same location"
1395                );
1396
1397                let stream_ident =
1398                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1399
1400                match builders_or_callback {
1401                    BuildersOrCallback::Builders(graph_builders) => {
1402                        let builder = graph_builders.entry(left_location_id).or_default();
1403                        builder.add_dfir(
1404                            parse_quote! {
1405                                #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1406                                #left_ident -> [0]#stream_ident;
1407                                #right_ident -> [1]#stream_ident;
1408                            },
1409                            None,
1410                            Some(&next_stmt_id.to_string()),
1411                        );
1412                    }
1413                    BuildersOrCallback::Callback(_, node_callback) => {
1414                        node_callback(self, next_stmt_id);
1415                    }
1416                }
1417
1418                *next_stmt_id += 1;
1419
1420                (stream_ident, left_location_id)
1421            }
1422
1423            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1424                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1425                    parse_quote!(difference_multiset)
1426                } else {
1427                    parse_quote!(anti_join_multiset)
1428                };
1429
1430                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1431                    self
1432                else {
1433                    unreachable!()
1434                };
1435
1436                let (neg, neg_lifetime) =
1437                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1438                        (neg, quote!('static))
1439                    } else {
1440                        (neg, quote!('tick))
1441                    };
1442
1443                let (pos_ident, pos_location_id) =
1444                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1445                let (neg_ident, neg_location_id) =
1446                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1447
1448                assert_eq!(
1449                    pos_location_id, neg_location_id,
1450                    "difference / anti join inputs must be in the same location"
1451                );
1452
1453                let stream_ident =
1454                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1455
1456                match builders_or_callback {
1457                    BuildersOrCallback::Builders(graph_builders) => {
1458                        let builder = graph_builders.entry(pos_location_id).or_default();
1459                        builder.add_dfir(
1460                            parse_quote! {
1461                                #stream_ident = #operator::<'tick, #neg_lifetime>();
1462                                #pos_ident -> [pos]#stream_ident;
1463                                #neg_ident -> [neg]#stream_ident;
1464                            },
1465                            None,
1466                            Some(&next_stmt_id.to_string()),
1467                        );
1468                    }
1469                    BuildersOrCallback::Callback(_, node_callback) => {
1470                        node_callback(self, next_stmt_id);
1471                    }
1472                }
1473
1474                *next_stmt_id += 1;
1475
1476                (stream_ident, pos_location_id)
1477            }
1478
1479            HydroNode::Map { f, input, .. } => {
1480                let (input_ident, input_location_id) =
1481                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1482
1483                let map_ident =
1484                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1485
1486                match builders_or_callback {
1487                    BuildersOrCallback::Builders(graph_builders) => {
1488                        let builder = graph_builders.entry(input_location_id).or_default();
1489                        builder.add_dfir(
1490                            parse_quote! {
1491                                #map_ident = #input_ident -> map(#f);
1492                            },
1493                            None,
1494                            Some(&next_stmt_id.to_string()),
1495                        );
1496                    }
1497                    BuildersOrCallback::Callback(_, node_callback) => {
1498                        node_callback(self, next_stmt_id);
1499                    }
1500                }
1501
1502                *next_stmt_id += 1;
1503
1504                (map_ident, input_location_id)
1505            }
1506
1507            HydroNode::FlatMap { f, input, .. } => {
1508                let (input_ident, input_location_id) =
1509                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1510
1511                let flat_map_ident =
1512                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1513
1514                match builders_or_callback {
1515                    BuildersOrCallback::Builders(graph_builders) => {
1516                        let builder = graph_builders.entry(input_location_id).or_default();
1517                        builder.add_dfir(
1518                            parse_quote! {
1519                                #flat_map_ident = #input_ident -> flat_map(#f);
1520                            },
1521                            None,
1522                            Some(&next_stmt_id.to_string()),
1523                        );
1524                    }
1525                    BuildersOrCallback::Callback(_, node_callback) => {
1526                        node_callback(self, next_stmt_id);
1527                    }
1528                }
1529
1530                *next_stmt_id += 1;
1531
1532                (flat_map_ident, input_location_id)
1533            }
1534
1535            HydroNode::Filter { f, input, .. } => {
1536                let (input_ident, input_location_id) =
1537                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1538
1539                let filter_ident =
1540                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1541
1542                match builders_or_callback {
1543                    BuildersOrCallback::Builders(graph_builders) => {
1544                        let builder = graph_builders.entry(input_location_id).or_default();
1545                        builder.add_dfir(
1546                            parse_quote! {
1547                                #filter_ident = #input_ident -> filter(#f);
1548                            },
1549                            None,
1550                            Some(&next_stmt_id.to_string()),
1551                        );
1552                    }
1553                    BuildersOrCallback::Callback(_, node_callback) => {
1554                        node_callback(self, next_stmt_id);
1555                    }
1556                }
1557
1558                *next_stmt_id += 1;
1559
1560                (filter_ident, input_location_id)
1561            }
1562
1563            HydroNode::FilterMap { f, input, .. } => {
1564                let (input_ident, input_location_id) =
1565                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1566
1567                let filter_map_ident =
1568                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1569
1570                match builders_or_callback {
1571                    BuildersOrCallback::Builders(graph_builders) => {
1572                        let builder = graph_builders.entry(input_location_id).or_default();
1573                        builder.add_dfir(
1574                            parse_quote! {
1575                                #filter_map_ident = #input_ident -> filter_map(#f);
1576                            },
1577                            None,
1578                            Some(&next_stmt_id.to_string()),
1579                        );
1580                    }
1581                    BuildersOrCallback::Callback(_, node_callback) => {
1582                        node_callback(self, next_stmt_id);
1583                    }
1584                }
1585
1586                *next_stmt_id += 1;
1587
1588                (filter_map_ident, input_location_id)
1589            }
1590
1591            HydroNode::Sort { input, .. } => {
1592                let (input_ident, input_location_id) =
1593                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1594
1595                let sort_ident =
1596                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1597
1598                match builders_or_callback {
1599                    BuildersOrCallback::Builders(graph_builders) => {
1600                        let builder = graph_builders.entry(input_location_id).or_default();
1601                        builder.add_dfir(
1602                            parse_quote! {
1603                                #sort_ident = #input_ident -> sort();
1604                            },
1605                            None,
1606                            Some(&next_stmt_id.to_string()),
1607                        );
1608                    }
1609                    BuildersOrCallback::Callback(_, node_callback) => {
1610                        node_callback(self, next_stmt_id);
1611                    }
1612                }
1613
1614                *next_stmt_id += 1;
1615
1616                (sort_ident, input_location_id)
1617            }
1618
1619            HydroNode::DeferTick { input, .. } => {
1620                let (input_ident, input_location_id) =
1621                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1622
1623                let defer_tick_ident =
1624                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1625
1626                match builders_or_callback {
1627                    BuildersOrCallback::Builders(graph_builders) => {
1628                        let builder = graph_builders.entry(input_location_id).or_default();
1629                        builder.add_dfir(
1630                            parse_quote! {
1631                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
1632                            },
1633                            None,
1634                            Some(&next_stmt_id.to_string()),
1635                        );
1636                    }
1637                    BuildersOrCallback::Callback(_, node_callback) => {
1638                        node_callback(self, next_stmt_id);
1639                    }
1640                }
1641
1642                *next_stmt_id += 1;
1643
1644                (defer_tick_ident, input_location_id)
1645            }
1646
1647            HydroNode::Enumerate {
1648                is_static, input, ..
1649            } => {
1650                let (input_ident, input_location_id) =
1651                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1652
1653                let enumerate_ident =
1654                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1655
1656                match builders_or_callback {
1657                    BuildersOrCallback::Builders(graph_builders) => {
1658                        let builder = graph_builders.entry(input_location_id).or_default();
1659                        let lifetime = if *is_static {
1660                            quote!('static)
1661                        } else {
1662                            quote!('tick)
1663                        };
1664                        builder.add_dfir(
1665                            parse_quote! {
1666                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1667                            },
1668                            None,
1669                            Some(&next_stmt_id.to_string()),
1670                        );
1671                    }
1672                    BuildersOrCallback::Callback(_, node_callback) => {
1673                        node_callback(self, next_stmt_id);
1674                    }
1675                }
1676
1677                *next_stmt_id += 1;
1678
1679                (enumerate_ident, input_location_id)
1680            }
1681
1682            HydroNode::Inspect { f, input, .. } => {
1683                let (input_ident, input_location_id) =
1684                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1685
1686                let inspect_ident =
1687                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1688
1689                match builders_or_callback {
1690                    BuildersOrCallback::Builders(graph_builders) => {
1691                        let builder = graph_builders.entry(input_location_id).or_default();
1692                        builder.add_dfir(
1693                            parse_quote! {
1694                                #inspect_ident = #input_ident -> inspect(#f);
1695                            },
1696                            None,
1697                            Some(&next_stmt_id.to_string()),
1698                        );
1699                    }
1700                    BuildersOrCallback::Callback(_, node_callback) => {
1701                        node_callback(self, next_stmt_id);
1702                    }
1703                }
1704
1705                *next_stmt_id += 1;
1706
1707                (inspect_ident, input_location_id)
1708            }
1709
1710            HydroNode::Unique { input, .. } => {
1711                let (input_ident, input_location_id) =
1712                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1713
1714                let unique_ident =
1715                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1716
1717                match builders_or_callback {
1718                    BuildersOrCallback::Builders(graph_builders) => {
1719                        let builder = graph_builders.entry(input_location_id).or_default();
1720                        builder.add_dfir(
1721                            parse_quote! {
1722                                #unique_ident = #input_ident -> unique::<'tick>();
1723                            },
1724                            None,
1725                            Some(&next_stmt_id.to_string()),
1726                        );
1727                    }
1728                    BuildersOrCallback::Callback(_, node_callback) => {
1729                        node_callback(self, next_stmt_id);
1730                    }
1731                }
1732
1733                *next_stmt_id += 1;
1734
1735                (unique_ident, input_location_id)
1736            }
1737
1738            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1739                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1740                    parse_quote!(fold)
1741                } else {
1742                    parse_quote!(fold_keyed)
1743                };
1744
1745                let (HydroNode::Fold {
1746                    init, acc, input, ..
1747                }
1748                | HydroNode::FoldKeyed {
1749                    init, acc, input, ..
1750                }) = self
1751                else {
1752                    unreachable!()
1753                };
1754
1755                let (input, lifetime) =
1756                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1757                        (input, quote!('static))
1758                    } else {
1759                        (input, quote!('tick))
1760                    };
1761
1762                let (input_ident, input_location_id) =
1763                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1764
1765                let fold_ident =
1766                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1767
1768                match builders_or_callback {
1769                    BuildersOrCallback::Builders(graph_builders) => {
1770                        let builder = graph_builders.entry(input_location_id).or_default();
1771                        builder.add_dfir(
1772                            parse_quote! {
1773                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1774                            },
1775                            None,
1776                            Some(&next_stmt_id.to_string()),
1777                        );
1778                    }
1779                    BuildersOrCallback::Callback(_, node_callback) => {
1780                        node_callback(self, next_stmt_id);
1781                    }
1782                }
1783
1784                *next_stmt_id += 1;
1785
1786                (fold_ident, input_location_id)
1787            }
1788
1789            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1790                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1791                    parse_quote!(reduce)
1792                } else {
1793                    parse_quote!(reduce_keyed)
1794                };
1795
1796                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1797                    self
1798                else {
1799                    unreachable!()
1800                };
1801
1802                let (input, lifetime) =
1803                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1804                        (input, quote!('static))
1805                    } else {
1806                        (input, quote!('tick))
1807                    };
1808
1809                let (input_ident, input_location_id) =
1810                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1811
1812                let reduce_ident =
1813                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1814
1815                match builders_or_callback {
1816                    BuildersOrCallback::Builders(graph_builders) => {
1817                        let builder = graph_builders.entry(input_location_id).or_default();
1818                        builder.add_dfir(
1819                            parse_quote! {
1820                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1821                            },
1822                            None,
1823                            Some(&next_stmt_id.to_string()),
1824                        );
1825                    }
1826                    BuildersOrCallback::Callback(_, node_callback) => {
1827                        node_callback(self, next_stmt_id);
1828                    }
1829                }
1830
1831                *next_stmt_id += 1;
1832
1833                (reduce_ident, input_location_id)
1834            }
1835
1836            HydroNode::Network {
1837                from_key: _,
1838                to_location,
1839                to_key: _,
1840                serialize_fn: serialize_pipeline,
1841                instantiate_fn,
1842                deserialize_fn: deserialize_pipeline,
1843                input,
1844                ..
1845            } => {
1846                let (input_ident, input_location_id) =
1847                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1848
1849                let to_id = match *to_location {
1850                    LocationId::Process(id) => id,
1851                    LocationId::Cluster(id) => id,
1852                    LocationId::Tick(_, _) => panic!(),
1853                    LocationId::ExternalProcess(id) => id,
1854                };
1855
1856                let receiver_stream_ident =
1857                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1858
1859                match builders_or_callback {
1860                    BuildersOrCallback::Builders(graph_builders) => {
1861                        let (sink_expr, source_expr) = match instantiate_fn {
1862                            DebugInstantiate::Building => (
1863                                syn::parse_quote!(DUMMY_SINK),
1864                                syn::parse_quote!(DUMMY_SOURCE),
1865                            ),
1866
1867                            DebugInstantiate::Finalized(sink, source, _connect_fn) => {
1868                                (sink.clone(), source.clone())
1869                            }
1870                        };
1871
1872                        let sender_builder = graph_builders.entry(input_location_id).or_default();
1873                        if let Some(serialize_pipeline) = serialize_pipeline {
1874                            sender_builder.add_dfir(
1875                                parse_quote! {
1876                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1877                                },
1878                                None,
1879                                Some(&next_stmt_id.to_string()),
1880                            );
1881                        } else {
1882                            sender_builder.add_dfir(
1883                                parse_quote! {
1884                                    #input_ident -> dest_sink(#sink_expr);
1885                                },
1886                                None,
1887                                Some(&next_stmt_id.to_string()),
1888                            );
1889                        }
1890
1891                        let receiver_builder = graph_builders.entry(to_id).or_default();
1892                        if let Some(deserialize_pipeline) = deserialize_pipeline {
1893                            receiver_builder.add_dfir(parse_quote! {
1894                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1895                            }, None, Some(&next_stmt_id.to_string()));
1896                        } else {
1897                            receiver_builder.add_dfir(
1898                                parse_quote! {
1899                                    #receiver_stream_ident = source_stream(#source_expr);
1900                                },
1901                                None,
1902                                Some(&next_stmt_id.to_string()),
1903                            );
1904                        }
1905                    }
1906                    BuildersOrCallback::Callback(_, node_callback) => {
1907                        node_callback(self, next_stmt_id);
1908                    }
1909                }
1910
1911                *next_stmt_id += 1;
1912
1913                (receiver_stream_ident, to_id)
1914            }
1915
1916            HydroNode::Counter {
1917                tag,
1918                duration,
1919                input,
1920                ..
1921            } => {
1922                let (input_ident, input_location_id) =
1923                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1924
1925                let counter_ident =
1926                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1927
1928                match builders_or_callback {
1929                    BuildersOrCallback::Builders(graph_builders) => {
1930                        let builder = graph_builders.entry(input_location_id).or_default();
1931                        builder.add_dfir(
1932                            parse_quote! {
1933                                #counter_ident = #input_ident -> _counter(#tag, #duration);
1934                            },
1935                            None,
1936                            Some(&next_stmt_id.to_string()),
1937                        );
1938                    }
1939                    BuildersOrCallback::Callback(_, node_callback) => {
1940                        node_callback(self, next_stmt_id);
1941                    }
1942                }
1943
1944                *next_stmt_id += 1;
1945
1946                (counter_ident, input_location_id)
1947            }
1948        }
1949    }
1950
1951    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1952        match self {
1953            HydroNode::Placeholder => {
1954                panic!()
1955            }
1956            HydroNode::Source { source, .. } => match source {
1957                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
1958                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
1959            },
1960            HydroNode::CycleSource { .. }
1961            | HydroNode::Tee { .. }
1962            | HydroNode::Persist { .. }
1963            | HydroNode::Unpersist { .. }
1964            | HydroNode::Delta { .. }
1965            | HydroNode::Chain { .. }
1966            | HydroNode::CrossProduct { .. }
1967            | HydroNode::CrossSingleton { .. }
1968            | HydroNode::Join { .. }
1969            | HydroNode::Difference { .. }
1970            | HydroNode::AntiJoin { .. }
1971            | HydroNode::DeferTick { .. }
1972            | HydroNode::Enumerate { .. }
1973            | HydroNode::Unique { .. }
1974            | HydroNode::Sort { .. } => {}
1975            HydroNode::Map { f, .. }
1976            | HydroNode::FlatMap { f, .. }
1977            | HydroNode::Filter { f, .. }
1978            | HydroNode::FilterMap { f, .. }
1979            | HydroNode::Inspect { f, .. }
1980            | HydroNode::Reduce { f, .. }
1981            | HydroNode::ReduceKeyed { f, .. } => {
1982                transform(f);
1983            }
1984            HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
1985                transform(init);
1986                transform(acc);
1987            }
1988            HydroNode::Network {
1989                serialize_fn,
1990                deserialize_fn,
1991                ..
1992            } => {
1993                if let Some(serialize_fn) = serialize_fn {
1994                    transform(serialize_fn);
1995                }
1996                if let Some(deserialize_fn) = deserialize_fn {
1997                    transform(deserialize_fn);
1998                }
1999            }
2000            HydroNode::Counter { duration, .. } => {
2001                transform(duration);
2002            }
2003        }
2004    }
2005
2006    pub fn metadata(&self) -> &HydroIrMetadata {
2007        match self {
2008            HydroNode::Placeholder => {
2009                panic!()
2010            }
2011            HydroNode::Source { metadata, .. } => metadata,
2012            HydroNode::CycleSource { metadata, .. } => metadata,
2013            HydroNode::Tee { metadata, .. } => metadata,
2014            HydroNode::Persist { metadata, .. } => metadata,
2015            HydroNode::Unpersist { metadata, .. } => metadata,
2016            HydroNode::Delta { metadata, .. } => metadata,
2017            HydroNode::Chain { metadata, .. } => metadata,
2018            HydroNode::CrossProduct { metadata, .. } => metadata,
2019            HydroNode::CrossSingleton { metadata, .. } => metadata,
2020            HydroNode::Join { metadata, .. } => metadata,
2021            HydroNode::Difference { metadata, .. } => metadata,
2022            HydroNode::AntiJoin { metadata, .. } => metadata,
2023            HydroNode::Map { metadata, .. } => metadata,
2024            HydroNode::FlatMap { metadata, .. } => metadata,
2025            HydroNode::Filter { metadata, .. } => metadata,
2026            HydroNode::FilterMap { metadata, .. } => metadata,
2027            HydroNode::DeferTick { metadata, .. } => metadata,
2028            HydroNode::Enumerate { metadata, .. } => metadata,
2029            HydroNode::Inspect { metadata, .. } => metadata,
2030            HydroNode::Unique { metadata, .. } => metadata,
2031            HydroNode::Sort { metadata, .. } => metadata,
2032            HydroNode::Fold { metadata, .. } => metadata,
2033            HydroNode::FoldKeyed { metadata, .. } => metadata,
2034            HydroNode::Reduce { metadata, .. } => metadata,
2035            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2036            HydroNode::Network { metadata, .. } => metadata,
2037            HydroNode::Counter { metadata, .. } => metadata,
2038        }
2039    }
2040
2041    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2042        match self {
2043            HydroNode::Placeholder => {
2044                panic!()
2045            }
2046            HydroNode::Source { metadata, .. } => metadata,
2047            HydroNode::CycleSource { metadata, .. } => metadata,
2048            HydroNode::Tee { metadata, .. } => metadata,
2049            HydroNode::Persist { metadata, .. } => metadata,
2050            HydroNode::Unpersist { metadata, .. } => metadata,
2051            HydroNode::Delta { metadata, .. } => metadata,
2052            HydroNode::Chain { metadata, .. } => metadata,
2053            HydroNode::CrossProduct { metadata, .. } => metadata,
2054            HydroNode::CrossSingleton { metadata, .. } => metadata,
2055            HydroNode::Join { metadata, .. } => metadata,
2056            HydroNode::Difference { metadata, .. } => metadata,
2057            HydroNode::AntiJoin { metadata, .. } => metadata,
2058            HydroNode::Map { metadata, .. } => metadata,
2059            HydroNode::FlatMap { metadata, .. } => metadata,
2060            HydroNode::Filter { metadata, .. } => metadata,
2061            HydroNode::FilterMap { metadata, .. } => metadata,
2062            HydroNode::DeferTick { metadata, .. } => metadata,
2063            HydroNode::Enumerate { metadata, .. } => metadata,
2064            HydroNode::Inspect { metadata, .. } => metadata,
2065            HydroNode::Unique { metadata, .. } => metadata,
2066            HydroNode::Sort { metadata, .. } => metadata,
2067            HydroNode::Fold { metadata, .. } => metadata,
2068            HydroNode::FoldKeyed { metadata, .. } => metadata,
2069            HydroNode::Reduce { metadata, .. } => metadata,
2070            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2071            HydroNode::Network { metadata, .. } => metadata,
2072            HydroNode::Counter { metadata, .. } => metadata,
2073        }
2074    }
2075
2076    pub fn print_root(&self) -> String {
2077        match self {
2078            HydroNode::Placeholder => {
2079                panic!()
2080            }
2081            HydroNode::Source { source, .. } => format!("Source({:?})", source),
2082            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2083            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2084            HydroNode::Persist { .. } => "Persist()".to_string(),
2085            HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2086            HydroNode::Delta { .. } => "Delta()".to_string(),
2087            HydroNode::Chain { first, second, .. } => {
2088                format!("Chain({}, {})", first.print_root(), second.print_root())
2089            }
2090            HydroNode::CrossProduct { left, right, .. } => {
2091                format!(
2092                    "CrossProduct({}, {})",
2093                    left.print_root(),
2094                    right.print_root()
2095                )
2096            }
2097            HydroNode::CrossSingleton { left, right, .. } => {
2098                format!(
2099                    "CrossSingleton({}, {})",
2100                    left.print_root(),
2101                    right.print_root()
2102                )
2103            }
2104            HydroNode::Join { left, right, .. } => {
2105                format!("Join({}, {})", left.print_root(), right.print_root())
2106            }
2107            HydroNode::Difference { pos, neg, .. } => {
2108                format!("Difference({}, {})", pos.print_root(), neg.print_root())
2109            }
2110            HydroNode::AntiJoin { pos, neg, .. } => {
2111                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2112            }
2113            HydroNode::Map { f, .. } => format!("Map({:?})", f),
2114            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2115            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2116            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2117            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2118            HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2119            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2120            HydroNode::Unique { .. } => "Unique()".to_string(),
2121            HydroNode::Sort { .. } => "Sort()".to_string(),
2122            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2123            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2124            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2125            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2126            HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2127            HydroNode::Counter { tag, duration, .. } => {
2128                format!("Counter({:?}, {:?})", tag, duration)
2129            }
2130        }
2131    }
2132}
2133
2134#[cfg(feature = "build")]
2135#[expect(clippy::too_many_arguments, reason = "networking internals")]
2136fn instantiate_network<'a, D: Deploy<'a>>(
2137    from_location: &LocationId,
2138    from_key: Option<usize>,
2139    to_location: &LocationId,
2140    to_key: Option<usize>,
2141    nodes: &HashMap<usize, D::Process>,
2142    clusters: &HashMap<usize, D::Cluster>,
2143    externals: &HashMap<usize, D::ExternalProcess>,
2144    compile_env: &D::CompileEnv,
2145) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>) {
2146    let ((sink, source), connect_fn) = match (from_location, to_location) {
2147        (LocationId::Process(from), LocationId::Process(to)) => {
2148            let from_node = nodes
2149                .get(from)
2150                .unwrap_or_else(|| {
2151                    panic!("A process used in the graph was not instantiated: {}", from)
2152                })
2153                .clone();
2154            let to_node = nodes
2155                .get(to)
2156                .unwrap_or_else(|| {
2157                    panic!("A process used in the graph was not instantiated: {}", to)
2158                })
2159                .clone();
2160
2161            let sink_port = D::allocate_process_port(&from_node);
2162            let source_port = D::allocate_process_port(&to_node);
2163
2164            (
2165                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2166                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2167            )
2168        }
2169        (LocationId::Process(from), LocationId::Cluster(to)) => {
2170            let from_node = nodes
2171                .get(from)
2172                .unwrap_or_else(|| {
2173                    panic!("A process used in the graph was not instantiated: {}", from)
2174                })
2175                .clone();
2176            let to_node = clusters
2177                .get(to)
2178                .unwrap_or_else(|| {
2179                    panic!("A cluster used in the graph was not instantiated: {}", to)
2180                })
2181                .clone();
2182
2183            let sink_port = D::allocate_process_port(&from_node);
2184            let source_port = D::allocate_cluster_port(&to_node);
2185
2186            (
2187                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2188                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2189            )
2190        }
2191        (LocationId::Cluster(from), LocationId::Process(to)) => {
2192            let from_node = clusters
2193                .get(from)
2194                .unwrap_or_else(|| {
2195                    panic!("A cluster used in the graph was not instantiated: {}", from)
2196                })
2197                .clone();
2198            let to_node = nodes
2199                .get(to)
2200                .unwrap_or_else(|| {
2201                    panic!("A process used in the graph was not instantiated: {}", to)
2202                })
2203                .clone();
2204
2205            let sink_port = D::allocate_cluster_port(&from_node);
2206            let source_port = D::allocate_process_port(&to_node);
2207
2208            (
2209                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2210                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2211            )
2212        }
2213        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2214            let from_node = clusters
2215                .get(from)
2216                .unwrap_or_else(|| {
2217                    panic!("A cluster used in the graph was not instantiated: {}", from)
2218                })
2219                .clone();
2220            let to_node = clusters
2221                .get(to)
2222                .unwrap_or_else(|| {
2223                    panic!("A cluster used in the graph was not instantiated: {}", to)
2224                })
2225                .clone();
2226
2227            let sink_port = D::allocate_cluster_port(&from_node);
2228            let source_port = D::allocate_cluster_port(&to_node);
2229
2230            (
2231                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2232                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2233            )
2234        }
2235        (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2236            let from_node = externals
2237                .get(from)
2238                .unwrap_or_else(|| {
2239                    panic!(
2240                        "A external used in the graph was not instantiated: {}",
2241                        from
2242                    )
2243                })
2244                .clone();
2245
2246            let to_node = nodes
2247                .get(to)
2248                .unwrap_or_else(|| {
2249                    panic!("A process used in the graph was not instantiated: {}", to)
2250                })
2251                .clone();
2252
2253            let sink_port = D::allocate_external_port(&from_node);
2254            let source_port = D::allocate_process_port(&to_node);
2255
2256            from_node.register(from_key.unwrap(), sink_port.clone());
2257
2258            (
2259                (
2260                    parse_quote!(DUMMY),
2261                    D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2262                ),
2263                D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2264            )
2265        }
2266        (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2267            todo!("NYI")
2268        }
2269        (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2270            panic!("Cannot send from external to external")
2271        }
2272        (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2273            let from_node = nodes
2274                .get(from)
2275                .unwrap_or_else(|| {
2276                    panic!("A process used in the graph was not instantiated: {}", from)
2277                })
2278                .clone();
2279
2280            let to_node = externals
2281                .get(to)
2282                .unwrap_or_else(|| {
2283                    panic!("A external used in the graph was not instantiated: {}", to)
2284                })
2285                .clone();
2286
2287            let sink_port = D::allocate_process_port(&from_node);
2288            let source_port = D::allocate_external_port(&to_node);
2289
2290            to_node.register(to_key.unwrap(), source_port.clone());
2291
2292            (
2293                (
2294                    D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2295                    parse_quote!(DUMMY),
2296                ),
2297                D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2298            )
2299        }
2300        (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2301            todo!("NYI")
2302        }
2303        (LocationId::Tick(_, _), _) => panic!(),
2304        (_, LocationId::Tick(_, _)) => panic!(),
2305    };
2306    (sink, source, connect_fn)
2307}