Skip to main content

hydro_lang/viz/
render.rs

1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12// Re-export specific implementations
13pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19/// Label for a graph node - can be either a static string or contain expressions.
20#[derive(Debug, Clone)]
21pub enum NodeLabel {
22    /// A static string label
23    Static(String),
24    /// A label with an operation name and expression arguments
25    WithExprs {
26        op_name: String,
27        exprs: Vec<DebugExpr>,
28    },
29}
30
31impl NodeLabel {
32    /// Create a static label
33    pub fn static_label(s: String) -> Self {
34        Self::Static(s)
35    }
36
37    /// Create a label for an operation with multiple expression
38    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39        Self::WithExprs { op_name, exprs }
40    }
41}
42
43impl Display for NodeLabel {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Static(s) => write!(f, "{}", s),
47            Self::WithExprs { op_name, exprs } => {
48                if exprs.is_empty() {
49                    write!(f, "{}()", op_name)
50                } else {
51                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52                    write!(f, "{}({})", op_name, expr_strs.join(", "))
53                }
54            }
55        }
56    }
57}
58
59/// Base struct for text-based graph writers that use indentation.
60/// Contains common fields shared by DOT and Mermaid writers.
61pub struct IndentedGraphWriter<'a, W> {
62    pub write: W,
63    pub indent: usize,
64    pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68    /// Create a new writer with default configuration.
69    pub fn new(write: W) -> Self {
70        Self {
71            write,
72            indent: 0,
73            config: HydroWriteConfig::default(),
74        }
75    }
76
77    /// Create a new writer with the given configuration.
78    pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79        Self {
80            write,
81            indent: 0,
82            config,
83        }
84    }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88    /// Write an indented line using the current indentation level.
89    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91    }
92}
93
94/// Common error type used by all graph writers.
95pub type GraphWriteError = std::fmt::Error;
96
97/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
98#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100    /// Error type emitted by writing.
101    type Err: Error;
102
103    /// Begin the graph. First method called.
104    fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106    /// Write a node definition with styling.
107    fn write_node_definition(
108        &mut self,
109        node_id: VizNodeKey,
110        node_label: &NodeLabel,
111        node_type: HydroNodeType,
112        location_key: Option<LocationKey>,
113        location_type: Option<LocationType>,
114        backtrace: Option<&Backtrace>,
115    ) -> Result<(), Self::Err>;
116
117    /// Write an edge between nodes with optional labeling.
118    fn write_edge(
119        &mut self,
120        src_id: VizNodeKey,
121        dst_id: VizNodeKey,
122        edge_properties: &HashSet<HydroEdgeProp>,
123        label: Option<&str>,
124    ) -> Result<(), Self::Err>;
125
126    /// Begin writing a location grouping (process/cluster).
127    fn write_location_start(
128        &mut self,
129        location_key: LocationKey,
130        location_type: LocationType,
131    ) -> Result<(), Self::Err>;
132
133    /// Write a node within a location.
134    fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136    /// End writing a location grouping.
137    fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139    /// End the graph. Last method called.
140    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143/// Node type utilities - centralized handling of HydroNodeType operations
144pub mod node_type_utils {
145    use super::HydroNodeType;
146
147    /// All node types with their string names
148    const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149        (HydroNodeType::Source, "Source"),
150        (HydroNodeType::Transform, "Transform"),
151        (HydroNodeType::Join, "Join"),
152        (HydroNodeType::Aggregation, "Aggregation"),
153        (HydroNodeType::Network, "Network"),
154        (HydroNodeType::Sink, "Sink"),
155        (HydroNodeType::Tee, "Tee"),
156        (HydroNodeType::NonDeterministic, "NonDeterministic"),
157    ];
158
159    /// Convert HydroNodeType to string representation (used by JSON format)
160    pub fn to_string(node_type: HydroNodeType) -> &'static str {
161        NODE_TYPE_DATA
162            .iter()
163            .find(|(nt, _)| *nt == node_type)
164            .map(|(_, name)| *name)
165            .unwrap_or("Unknown")
166    }
167
168    /// Get all node types with their string representations (used by JSON format)
169    pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170        NODE_TYPE_DATA.to_vec()
171    }
172}
173
174/// Types of nodes in Hydro IR for styling purposes.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177    Source,
178    Transform,
179    Join,
180    Aggregation,
181    Network,
182    Sink,
183    Tee,
184    NonDeterministic,
185}
186
187/// Types of edges in Hydro IR representing stream properties.
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190    Bounded,
191    Unbounded,
192    TotalOrder,
193    NoOrder,
194    Keyed,
195    // Collection type tags for styling
196    Stream,
197    KeyedSingleton,
198    KeyedStream,
199    Singleton,
200    Optional,
201    Network,
202    Cycle,
203}
204
205/// Unified edge style representation for all graph formats.
206/// This intermediate format allows consistent styling across JSON, DOT, and Mermaid.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209    /// Line pattern (solid, dashed)
210    pub line_pattern: LinePattern,
211    /// Line width (1 = thin, 3 = thick)
212    pub line_width: u8,
213    /// Arrowhead style
214    pub arrowhead: ArrowheadStyle,
215    /// Line style (single plain line, or line with hash marks/dots for keyed streams)
216    pub line_style: LineStyle,
217    /// Halo/background effect for boundedness
218    pub halo: HaloStyle,
219    /// Line waviness for ordering information
220    pub waviness: WavinessStyle,
221    /// Whether animation is enabled (JSON only)
222    pub animation: AnimationStyle,
223    /// Color for the edge
224    pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229    Solid,
230    Dotted,
231    Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236    TriangleFilled,
237    CircleFilled,
238    DiamondOpen,
239    Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244    /// Plain single line
245    Single,
246    /// Single line with hash marks/dots (for keyed streams)
247    HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252    None,
253    LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258    None,
259    Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264    Static,
265    Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269    fn default() -> Self {
270        Self {
271            line_pattern: LinePattern::Solid,
272            line_width: 1,
273            arrowhead: ArrowheadStyle::Default,
274            line_style: LineStyle::Single,
275            halo: HaloStyle::None,
276            waviness: WavinessStyle::None,
277            animation: AnimationStyle::Static,
278            color: "#666666",
279        }
280    }
281}
282
283/// Convert HydroEdgeType properties to unified edge style.
284/// This is the core logic for determining edge visual properties.
285///
286/// # Visual Encoding Mapping
287///
288/// | Semantic Property | Visual Channel | Values |
289/// |------------------|----------------|---------|
290/// | Network | Line Pattern + Animation | Local (solid, static), Network (dashed, animated) |
291/// | Ordering | Waviness | TotalOrder (straight), NoOrder (wavy) |
292/// | Boundedness | Halo | Bounded (none), Unbounded (light-blue transparent) |
293/// | Keyedness | Line Style | NotKeyed (plain line), Keyed (line with hash marks/dots) |
294/// | Collection Type | Color + Arrowhead | Stream (blue #2563eb, triangle), Singleton (black, circle), Optional (gray, diamond) |
295pub fn get_unified_edge_style(
296    edge_properties: &HashSet<HydroEdgeProp>,
297    src_location: Option<usize>,
298    dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300    let mut style = UnifiedEdgeStyle::default();
301
302    // Network communication group - controls line pattern AND animation
303    let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304        || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306    if is_network {
307        style.line_pattern = LinePattern::Dashed;
308        style.animation = AnimationStyle::Animated;
309    } else {
310        style.line_pattern = LinePattern::Solid;
311        style.animation = AnimationStyle::Static;
312    }
313
314    // Boundedness group - controls halo
315    if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316        style.halo = HaloStyle::LightBlue;
317    } else {
318        style.halo = HaloStyle::None;
319    }
320
321    // Collection type group - controls arrowhead and color
322    if edge_properties.contains(&HydroEdgeProp::Stream) {
323        style.arrowhead = ArrowheadStyle::TriangleFilled;
324        style.color = "#2563eb"; // Bright blue for Stream
325    } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326        style.arrowhead = ArrowheadStyle::TriangleFilled;
327        style.color = "#2563eb"; // Bright blue for Stream (keyed variant)
328    } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329        style.arrowhead = ArrowheadStyle::TriangleFilled;
330        style.color = "#000000"; // Black for Singleton (keyed variant)
331    } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332        style.arrowhead = ArrowheadStyle::CircleFilled;
333        style.color = "#000000"; // Black for Singleton
334    } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335        style.arrowhead = ArrowheadStyle::DiamondOpen;
336        style.color = "#6b7280"; // Gray for Optional
337    }
338
339    // Keyedness group - controls hash marks on the line
340    if edge_properties.contains(&HydroEdgeProp::Keyed) {
341        style.line_style = LineStyle::HashMarks; // Renders as hash marks/dots on the line in hydroscope
342    } else {
343        style.line_style = LineStyle::Single;
344    }
345
346    // Ordering group - waviness channel
347    if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348        style.waviness = WavinessStyle::Wavy;
349    } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350        style.waviness = WavinessStyle::None;
351    }
352
353    style
354}
355
356/// Extract semantic edge properties from CollectionKind metadata.
357/// This function analyzes the collection type and extracts relevant semantic tags
358/// for visualization purposes.
359pub fn extract_edge_properties_from_collection_kind(
360    collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362    use crate::compile::ir::CollectionKind;
363
364    let mut properties = HashSet::new();
365
366    match collection_kind {
367        CollectionKind::Stream { bound, order, .. } => {
368            properties.insert(HydroEdgeProp::Stream);
369            add_bound_property(&mut properties, bound);
370            add_order_property(&mut properties, order);
371        }
372        CollectionKind::KeyedStream {
373            bound, value_order, ..
374        } => {
375            properties.insert(HydroEdgeProp::KeyedStream);
376            properties.insert(HydroEdgeProp::Keyed);
377            add_bound_property(&mut properties, bound);
378            add_order_property(&mut properties, value_order);
379        }
380        CollectionKind::Singleton { bound, .. } => {
381            properties.insert(HydroEdgeProp::Singleton);
382            add_singleton_bound_property(&mut properties, bound);
383            // Singletons have implicit TotalOrder
384            properties.insert(HydroEdgeProp::TotalOrder);
385        }
386        CollectionKind::Optional { bound, .. } => {
387            properties.insert(HydroEdgeProp::Optional);
388            add_bound_property(&mut properties, bound);
389            // Optionals have implicit TotalOrder
390            properties.insert(HydroEdgeProp::TotalOrder);
391        }
392        CollectionKind::KeyedSingleton { bound, .. } => {
393            properties.insert(HydroEdgeProp::Singleton);
394            properties.insert(HydroEdgeProp::Keyed);
395            // KeyedSingletons boundedness depends on the bound kind
396            add_keyed_singleton_bound_property(&mut properties, bound);
397            properties.insert(HydroEdgeProp::TotalOrder);
398        }
399    }
400
401    properties
402}
403
404/// Helper function to add bound property based on BoundKind.
405fn add_bound_property(
406    properties: &mut HashSet<HydroEdgeProp>,
407    bound: &crate::compile::ir::BoundKind,
408) {
409    use crate::compile::ir::BoundKind;
410
411    match bound {
412        BoundKind::Bounded => {
413            properties.insert(HydroEdgeProp::Bounded);
414        }
415        BoundKind::Unbounded => {
416            properties.insert(HydroEdgeProp::Unbounded);
417        }
418    }
419}
420
421/// Helper function to add bound property for Singleton based on SingletonBoundKind.
422fn add_singleton_bound_property(
423    properties: &mut HashSet<HydroEdgeProp>,
424    bound: &crate::compile::ir::SingletonBoundKind,
425) {
426    use crate::compile::ir::SingletonBoundKind;
427
428    match bound {
429        SingletonBoundKind::Bounded => {
430            properties.insert(HydroEdgeProp::Bounded);
431        }
432        SingletonBoundKind::Monotonic | SingletonBoundKind::Unbounded => {
433            properties.insert(HydroEdgeProp::Unbounded);
434        }
435    }
436}
437
438/// Helper function to add bound property for KeyedSingleton based on KeyedSingletonBoundKind.
439fn add_keyed_singleton_bound_property(
440    properties: &mut HashSet<HydroEdgeProp>,
441    bound: &crate::compile::ir::KeyedSingletonBoundKind,
442) {
443    use crate::compile::ir::KeyedSingletonBoundKind;
444
445    match bound {
446        KeyedSingletonBoundKind::Bounded => {
447            properties.insert(HydroEdgeProp::Bounded);
448        }
449        KeyedSingletonBoundKind::BoundedValue
450        | KeyedSingletonBoundKind::MonotonicValue
451        | KeyedSingletonBoundKind::Unbounded => {
452            properties.insert(HydroEdgeProp::Unbounded);
453        }
454    }
455}
456
457/// Helper function to add order property based on StreamOrder.
458fn add_order_property(
459    properties: &mut HashSet<HydroEdgeProp>,
460    order: &crate::compile::ir::StreamOrder,
461) {
462    use crate::compile::ir::StreamOrder;
463
464    match order {
465        StreamOrder::TotalOrder => {
466            properties.insert(HydroEdgeProp::TotalOrder);
467        }
468        StreamOrder::NoOrder => {
469            properties.insert(HydroEdgeProp::NoOrder);
470        }
471    }
472}
473
474/// Detect if an edge crosses network boundaries by comparing source and destination locations.
475/// Returns true if the edge represents network communication between different locations.
476pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
477    // Compare the root locations to determine if they differ
478    src_location.root() != dst_location.root()
479}
480
481/// Add network edge tag if source and destination locations differ.
482pub fn add_network_edge_tag(
483    properties: &mut HashSet<HydroEdgeProp>,
484    src_location: &LocationId,
485    dst_location: &LocationId,
486) {
487    if is_network_edge(src_location, dst_location) {
488        properties.insert(HydroEdgeProp::Network);
489    }
490}
491
492/// Configuration for graph writing.
493#[derive(Debug, Clone, Copy)]
494pub struct HydroWriteConfig<'a> {
495    pub show_metadata: bool,
496    pub show_location_groups: bool,
497    pub use_short_labels: bool,
498    pub location_names: &'a SecondaryMap<LocationKey, String>,
499}
500
501impl Default for HydroWriteConfig<'_> {
502    fn default() -> Self {
503        static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
504        Self {
505            show_metadata: false,
506            show_location_groups: true,
507            use_short_labels: true, // Default to short labels for all renderers
508            location_names: EMPTY.get_or_init(SecondaryMap::new),
509        }
510    }
511}
512
513/// Node information in the Hydro graph.
514#[derive(Clone)]
515pub struct HydroGraphNode {
516    pub label: NodeLabel,
517    pub node_type: HydroNodeType,
518    pub location_key: Option<LocationKey>,
519    pub backtrace: Option<Backtrace>,
520}
521
522slotmap::new_key_type! {
523    /// Unique identifier for nodes in the visualization graph.
524    ///
525    /// This is counted/allocated separately from any other IDs within `hydro_lang`.
526    pub struct VizNodeKey;
527}
528
529impl Display for VizNodeKey {
530    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531        write!(f, "viz{:?}", self.data()) // `"viz1v1"``
532    }
533}
534
535/// This is used by the visualizer
536/// TODO(mingwei): Make this more robust?
537impl std::str::FromStr for VizNodeKey {
538    type Err = Option<ParseIntError>;
539
540    fn from_str(s: &str) -> Result<Self, Self::Err> {
541        let nvn = s.strip_prefix("viz").ok_or(None)?;
542        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
543        let idx: u64 = idx.parse()?;
544        let ver: u64 = ver.parse()?;
545        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
546    }
547}
548
549impl VizNodeKey {
550    /// A key for testing with index 1.
551    #[cfg(test)]
552    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000001)); // `1v143`
553
554    /// A key for testing with index 2.
555    #[cfg(test)]
556    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000002)); // `2v143`
557}
558
559/// Edge information in the Hydro graph.
560#[derive(Debug, Clone)]
561pub struct HydroGraphEdge {
562    pub src: VizNodeKey,
563    pub dst: VizNodeKey,
564    pub edge_properties: HashSet<HydroEdgeProp>,
565    pub label: Option<String>,
566}
567
568/// Graph structure tracker for Hydro IR rendering.
569#[derive(Default)]
570pub struct HydroGraphStructure {
571    pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
572    pub edges: Vec<HydroGraphEdge>,
573    pub locations: SecondaryMap<LocationKey, LocationType>,
574}
575
576impl HydroGraphStructure {
577    pub fn new() -> Self {
578        Self::default()
579    }
580
581    pub fn add_node(
582        &mut self,
583        label: NodeLabel,
584        node_type: HydroNodeType,
585        location_key: Option<LocationKey>,
586    ) -> VizNodeKey {
587        self.add_node_with_backtrace(label, node_type, location_key, None)
588    }
589
590    pub fn add_node_with_backtrace(
591        &mut self,
592        label: NodeLabel,
593        node_type: HydroNodeType,
594        location_key: Option<LocationKey>,
595        backtrace: Option<Backtrace>,
596    ) -> VizNodeKey {
597        self.nodes.insert(HydroGraphNode {
598            label,
599            node_type,
600            location_key,
601            backtrace,
602        })
603    }
604
605    /// Add a node with metadata, extracting backtrace automatically
606    pub fn add_node_with_metadata(
607        &mut self,
608        label: NodeLabel,
609        node_type: HydroNodeType,
610        metadata: &HydroIrMetadata,
611    ) -> VizNodeKey {
612        let location_key = Some(setup_location(self, metadata));
613        let backtrace = Some(metadata.op.backtrace.clone());
614        self.add_node_with_backtrace(label, node_type, location_key, backtrace)
615    }
616
617    pub fn add_edge(
618        &mut self,
619        src: VizNodeKey,
620        dst: VizNodeKey,
621        edge_properties: HashSet<HydroEdgeProp>,
622        label: Option<String>,
623    ) {
624        self.edges.push(HydroGraphEdge {
625            src,
626            dst,
627            edge_properties,
628            label,
629        });
630    }
631
632    // Legacy method for backward compatibility
633    pub fn add_edge_single(
634        &mut self,
635        src: VizNodeKey,
636        dst: VizNodeKey,
637        edge_type: HydroEdgeProp,
638        label: Option<String>,
639    ) {
640        let mut properties = HashSet::new();
641        properties.insert(edge_type);
642        self.edges.push(HydroGraphEdge {
643            src,
644            dst,
645            edge_properties: properties,
646            label,
647        });
648    }
649
650    pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
651        self.locations.insert(location_key, location_type);
652    }
653}
654
655/// Function to extract an op_name from a print_root() result for use in labels.
656pub fn extract_op_name(full_label: String) -> String {
657    full_label
658        .split('(')
659        .next()
660        .unwrap_or("unknown")
661        .to_lowercase()
662}
663
664/// Extract a short, readable label from the full token stream label using print_root() style naming
665pub fn extract_short_label(full_label: &str) -> String {
666    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
667    if let Some(op_name) = full_label.split('(').next() {
668        let base_name = op_name.to_lowercase();
669        match base_name.as_str() {
670            // Handle special cases for UI display
671            "source" => {
672                if full_label.contains("Iter") {
673                    "source_iter".to_owned()
674                } else if full_label.contains("Stream") {
675                    "source_stream".to_owned()
676                } else if full_label.contains("ExternalNetwork") {
677                    "external_network".to_owned()
678                } else if full_label.contains("Spin") {
679                    "spin".to_owned()
680                } else {
681                    "source".to_owned()
682                }
683            }
684            "network" => {
685                if full_label.contains("deser") {
686                    "network(recv)".to_owned()
687                } else if full_label.contains("ser") {
688                    "network(send)".to_owned()
689                } else {
690                    "network".to_owned()
691                }
692            }
693            // For all other cases, just use the lowercase base name (same as extract_op_name)
694            _ => base_name,
695        }
696    } else {
697        // Fallback for labels that don't follow the pattern
698        if full_label.len() > 20 {
699            format!("{}...", &full_label[..17])
700        } else {
701            full_label.to_owned()
702        }
703    }
704}
705
706/// Helper function to set up location in structure from metadata.
707fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
708    let root = metadata.location_id.root();
709    let location_key = root.key();
710    let location_type = root.location_type().unwrap();
711    structure.add_location(location_key, location_type);
712    location_key
713}
714
715/// Helper function to add an edge with semantic tags extracted from metadata.
716/// This function combines collection kind extraction with network detection.
717fn add_edge_with_metadata(
718    structure: &mut HydroGraphStructure,
719    src_id: VizNodeKey,
720    dst_id: VizNodeKey,
721    src_metadata: Option<&HydroIrMetadata>,
722    dst_metadata: Option<&HydroIrMetadata>,
723    label: Option<String>,
724) {
725    let mut properties = HashSet::new();
726
727    // Extract semantic tags from source metadata's collection kind
728    if let Some(metadata) = src_metadata {
729        properties.extend(extract_edge_properties_from_collection_kind(
730            &metadata.collection_kind,
731        ));
732    }
733
734    // Add network edge tag if locations differ
735    if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
736        add_network_edge_tag(
737            &mut properties,
738            &src_meta.location_id,
739            &dst_meta.location_id,
740        );
741    }
742
743    // If no properties were extracted, default to Stream
744    if properties.is_empty() {
745        properties.insert(HydroEdgeProp::Stream);
746    }
747
748    structure.add_edge(src_id, dst_id, properties, label);
749}
750
751/// Helper function to write a graph structure using any GraphWrite implementation
752fn write_graph_structure<W>(
753    structure: &HydroGraphStructure,
754    graph_write: W,
755    config: HydroWriteConfig<'_>,
756) -> Result<(), W::Err>
757where
758    W: HydroGraphWrite,
759{
760    let mut graph_write = graph_write;
761    // Write the graph
762    graph_write.write_prologue()?;
763
764    // Write node definitions
765    for (node_id, node) in structure.nodes.iter() {
766        let location_type = node
767            .location_key
768            .and_then(|loc_key| structure.locations.get(loc_key))
769            .copied();
770
771        graph_write.write_node_definition(
772            node_id,
773            &node.label,
774            node.node_type,
775            node.location_key,
776            location_type,
777            node.backtrace.as_ref(),
778        )?;
779    }
780
781    // Group nodes by location if requested
782    if config.show_location_groups {
783        let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
784        for (node_id, node) in structure.nodes.iter() {
785            if let Some(location_key) = node.location_key {
786                nodes_by_location
787                    .entry(location_key)
788                    .expect("location was removed")
789                    .or_default()
790                    .push(node_id);
791            }
792        }
793
794        for (location_key, node_ids) in nodes_by_location.iter() {
795            if let Some(&location_type) = structure.locations.get(location_key) {
796                graph_write.write_location_start(location_key, location_type)?;
797                for &node_id in node_ids.iter() {
798                    graph_write.write_node(node_id)?;
799                }
800                graph_write.write_location_end()?;
801            }
802        }
803    }
804
805    // Write edges
806    for edge in structure.edges.iter() {
807        graph_write.write_edge(
808            edge.src,
809            edge.dst,
810            &edge.edge_properties,
811            edge.label.as_deref(),
812        )?;
813    }
814
815    graph_write.write_epilogue()?;
816    Ok(())
817}
818
819impl HydroRoot {
820    /// Build the graph structure by traversing the IR tree.
821    pub fn build_graph_structure(
822        &self,
823        structure: &mut HydroGraphStructure,
824        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
825        config: HydroWriteConfig<'_>,
826    ) -> VizNodeKey {
827        // Helper function for sink nodes to reduce duplication
828        fn build_sink_node(
829            structure: &mut HydroGraphStructure,
830            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
831            config: HydroWriteConfig<'_>,
832            input: &HydroNode,
833            sink_metadata: Option<&HydroIrMetadata>,
834            label: NodeLabel,
835        ) -> VizNodeKey {
836            let input_id = input.build_graph_structure(structure, seen_tees, config);
837
838            // If no explicit metadata is provided, extract it from the input node
839            let effective_metadata = if let Some(meta) = sink_metadata {
840                Some(meta)
841            } else {
842                match input {
843                    HydroNode::Placeholder => None,
844                    // All other variants have metadata
845                    _ => Some(input.metadata()),
846                }
847            };
848
849            let location_key = effective_metadata.map(|m| setup_location(structure, m));
850            let sink_id = structure.add_node_with_backtrace(
851                label,
852                HydroNodeType::Sink,
853                location_key,
854                effective_metadata.map(|m| m.op.backtrace.clone()),
855            );
856
857            // Extract semantic tags from input metadata
858            let input_metadata = input.metadata();
859            add_edge_with_metadata(
860                structure,
861                input_id,
862                sink_id,
863                Some(input_metadata),
864                sink_metadata,
865                None,
866            );
867
868            sink_id
869        }
870
871        match self {
872            // Sink operations - semantic tags extracted from input metadata
873            HydroRoot::ForEach { f, input, .. } => build_sink_node(
874                structure,
875                seen_tees,
876                config,
877                input,
878                None,
879                NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
880            ),
881
882            HydroRoot::SendExternal {
883                to_external_key,
884                to_port_id,
885                input,
886                ..
887            } => build_sink_node(
888                structure,
889                seen_tees,
890                config,
891                input,
892                None,
893                NodeLabel::with_exprs(
894                    format!("send_external({}:{})", to_external_key, to_port_id),
895                    vec![],
896                ),
897            ),
898
899            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
900                structure,
901                seen_tees,
902                config,
903                input,
904                None,
905                NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
906            ),
907
908            HydroRoot::CycleSink {
909                cycle_id, input, ..
910            } => build_sink_node(
911                structure,
912                seen_tees,
913                config,
914                input,
915                None,
916                NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
917            ),
918
919            HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
920                structure,
921                seen_tees,
922                config,
923                input,
924                None,
925                NodeLabel::static_label(format!("embedded_output({})", ident)),
926            ),
927
928            HydroRoot::Null { input, .. } => build_sink_node(
929                structure,
930                seen_tees,
931                config,
932                input,
933                None,
934                NodeLabel::static_label("null".to_owned()),
935            ),
936        }
937    }
938}
939
940impl HydroNode {
941    /// Build the graph structure recursively for this node.
942    pub fn build_graph_structure(
943        &self,
944        structure: &mut HydroGraphStructure,
945        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
946        config: HydroWriteConfig<'_>,
947    ) -> VizNodeKey {
948        // Helper functions to reduce duplication, categorized by input/expression patterns
949
950        /// Common parameters for transform builder functions to reduce argument count
951        struct TransformParams<'a> {
952            structure: &'a mut HydroGraphStructure,
953            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
954            config: HydroWriteConfig<'a>,
955            input: &'a HydroNode,
956            metadata: &'a HydroIrMetadata,
957            op_name: String,
958            node_type: HydroNodeType,
959        }
960
961        // Single-input transform with no expressions
962        fn build_simple_transform(params: TransformParams) -> VizNodeKey {
963            let input_id = params.input.build_graph_structure(
964                params.structure,
965                params.seen_tees,
966                params.config,
967            );
968            let node_id = params.structure.add_node_with_metadata(
969                NodeLabel::Static(params.op_name.to_string()),
970                params.node_type,
971                params.metadata,
972            );
973
974            // Extract semantic tags from input metadata
975            let input_metadata = params.input.metadata();
976            add_edge_with_metadata(
977                params.structure,
978                input_id,
979                node_id,
980                Some(input_metadata),
981                Some(params.metadata),
982                None,
983            );
984
985            node_id
986        }
987
988        // Single-input transform with one expression
989        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
990            let input_id = params.input.build_graph_structure(
991                params.structure,
992                params.seen_tees,
993                params.config,
994            );
995            let node_id = params.structure.add_node_with_metadata(
996                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
997                params.node_type,
998                params.metadata,
999            );
1000
1001            // Extract semantic tags from input metadata
1002            let input_metadata = params.input.metadata();
1003            add_edge_with_metadata(
1004                params.structure,
1005                input_id,
1006                node_id,
1007                Some(input_metadata),
1008                Some(params.metadata),
1009                None,
1010            );
1011
1012            node_id
1013        }
1014
1015        // Single-input transform with two expressions
1016        fn build_dual_expr_transform(
1017            params: TransformParams,
1018            expr1: &DebugExpr,
1019            expr2: &DebugExpr,
1020        ) -> VizNodeKey {
1021            let input_id = params.input.build_graph_structure(
1022                params.structure,
1023                params.seen_tees,
1024                params.config,
1025            );
1026            let node_id = params.structure.add_node_with_metadata(
1027                NodeLabel::with_exprs(
1028                    params.op_name.to_string(),
1029                    vec![expr1.clone(), expr2.clone()],
1030                ),
1031                params.node_type,
1032                params.metadata,
1033            );
1034
1035            // Extract semantic tags from input metadata
1036            let input_metadata = params.input.metadata();
1037            add_edge_with_metadata(
1038                params.structure,
1039                input_id,
1040                node_id,
1041                Some(input_metadata),
1042                Some(params.metadata),
1043                None,
1044            );
1045
1046            node_id
1047        }
1048
1049        // Helper function for source nodes
1050        fn build_source_node(
1051            structure: &mut HydroGraphStructure,
1052            metadata: &HydroIrMetadata,
1053            label: String,
1054        ) -> VizNodeKey {
1055            structure.add_node_with_metadata(
1056                NodeLabel::Static(label),
1057                HydroNodeType::Source,
1058                metadata,
1059            )
1060        }
1061
1062        match self {
1063            HydroNode::Placeholder => structure.add_node(
1064                NodeLabel::Static("PLACEHOLDER".to_owned()),
1065                HydroNodeType::Transform,
1066                None,
1067            ),
1068
1069            HydroNode::Source {
1070                source, metadata, ..
1071            } => {
1072                let label = match source {
1073                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
1074                    HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1075                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
1076                    HydroSource::Spin() => "spin()".to_owned(),
1077                    HydroSource::ClusterMembers(location_id, _) => {
1078                        format!(
1079                            "source_stream(cluster_membership_stream({:?}))",
1080                            location_id
1081                        )
1082                    }
1083                    HydroSource::Embedded(ident) => {
1084                        format!("embedded_input({})", ident)
1085                    }
1086                    HydroSource::EmbeddedSingleton(ident) => {
1087                        format!("embedded_singleton_input({})", ident)
1088                    }
1089                };
1090                build_source_node(structure, metadata, label)
1091            }
1092
1093            HydroNode::SingletonSource {
1094                value,
1095                first_tick_only,
1096                metadata,
1097            } => {
1098                let label = if *first_tick_only {
1099                    format!("singleton_first_tick({})", value)
1100                } else {
1101                    format!("singleton({})", value)
1102                };
1103                build_source_node(structure, metadata, label)
1104            }
1105
1106            HydroNode::ExternalInput {
1107                from_external_key,
1108                from_port_id,
1109                metadata,
1110                ..
1111            } => build_source_node(
1112                structure,
1113                metadata,
1114                format!("external_input({}:{})", from_external_key, from_port_id),
1115            ),
1116
1117            HydroNode::CycleSource {
1118                cycle_id, metadata, ..
1119            } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
1120
1121            HydroNode::Tee { inner, metadata } => {
1122                let ptr = inner.as_ptr();
1123                if let Some(&existing_id) = seen_tees.get(&ptr) {
1124                    return existing_id;
1125                }
1126
1127                let input_id = inner
1128                    .0
1129                    .borrow()
1130                    .build_graph_structure(structure, seen_tees, config);
1131                let tee_id = structure.add_node_with_metadata(
1132                    NodeLabel::Static(extract_op_name(self.print_root())),
1133                    HydroNodeType::Tee,
1134                    metadata,
1135                );
1136
1137                seen_tees.insert(ptr, tee_id);
1138
1139                // Extract semantic tags from input
1140                let inner_borrow = inner.0.borrow();
1141                let input_metadata = inner_borrow.metadata();
1142                add_edge_with_metadata(
1143                    structure,
1144                    input_id,
1145                    tee_id,
1146                    Some(input_metadata),
1147                    Some(metadata),
1148                    None,
1149                );
1150                drop(inner_borrow);
1151
1152                tee_id
1153            }
1154
1155            HydroNode::Partition {
1156                inner, metadata, ..
1157            } => {
1158                let ptr = inner.as_ptr();
1159                if let Some(&existing_id) = seen_tees.get(&ptr) {
1160                    return existing_id;
1161                }
1162
1163                let input_id = inner
1164                    .0
1165                    .borrow()
1166                    .build_graph_structure(structure, seen_tees, config);
1167                let partition_id = structure.add_node_with_metadata(
1168                    NodeLabel::Static(extract_op_name(self.print_root())),
1169                    HydroNodeType::Tee,
1170                    metadata,
1171                );
1172
1173                seen_tees.insert(ptr, partition_id);
1174
1175                // Extract semantic tags from input
1176                let inner_borrow = inner.0.borrow();
1177                let input_metadata = inner_borrow.metadata();
1178                add_edge_with_metadata(
1179                    structure,
1180                    input_id,
1181                    partition_id,
1182                    Some(input_metadata),
1183                    Some(metadata),
1184                    None,
1185                );
1186                drop(inner_borrow);
1187
1188                partition_id
1189            }
1190
1191            // Non-deterministic operation
1192            HydroNode::ObserveNonDet {
1193                inner, metadata, ..
1194            } => build_simple_transform(TransformParams {
1195                structure,
1196                seen_tees,
1197                config,
1198                input: inner,
1199                metadata,
1200                op_name: extract_op_name(self.print_root()),
1201                node_type: HydroNodeType::NonDeterministic,
1202            }),
1203
1204            // Transform operations with Stream edges - grouped by node/edge type
1205            HydroNode::Cast { inner, metadata }
1206            | HydroNode::DeferTick {
1207                input: inner,
1208                metadata,
1209            }
1210            | HydroNode::Enumerate {
1211                input: inner,
1212                metadata,
1213                ..
1214            }
1215            | HydroNode::Unique {
1216                input: inner,
1217                metadata,
1218            }
1219            | HydroNode::ResolveFutures {
1220                input: inner,
1221                metadata,
1222            }
1223            | HydroNode::ResolveFuturesBlocking {
1224                input: inner,
1225                metadata,
1226            }
1227            | HydroNode::ResolveFuturesOrdered {
1228                input: inner,
1229                metadata,
1230            } => build_simple_transform(TransformParams {
1231                structure,
1232                seen_tees,
1233                config,
1234                input: inner,
1235                metadata,
1236                op_name: extract_op_name(self.print_root()),
1237                node_type: HydroNodeType::Transform,
1238            }),
1239
1240            // Aggregation operation - semantic tags extracted from metadata
1241            HydroNode::Sort {
1242                input: inner,
1243                metadata,
1244            } => build_simple_transform(TransformParams {
1245                structure,
1246                seen_tees,
1247                config,
1248                input: inner,
1249                metadata,
1250                op_name: extract_op_name(self.print_root()),
1251                node_type: HydroNodeType::Aggregation,
1252            }),
1253
1254            // Single-expression Transform operations - grouped by node type
1255            HydroNode::Map { f, input, metadata }
1256            | HydroNode::Filter { f, input, metadata }
1257            | HydroNode::FlatMap { f, input, metadata }
1258            | HydroNode::FlatMapStreamBlocking { f, input, metadata }
1259            | HydroNode::FilterMap { f, input, metadata }
1260            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1261                TransformParams {
1262                    structure,
1263                    seen_tees,
1264                    config,
1265                    input,
1266                    metadata,
1267                    op_name: extract_op_name(self.print_root()),
1268                    node_type: HydroNodeType::Transform,
1269                },
1270                f,
1271            ),
1272
1273            // Single-expression Aggregation operations - grouped by node type
1274            HydroNode::Reduce { f, input, metadata }
1275            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1276                TransformParams {
1277                    structure,
1278                    seen_tees,
1279                    config,
1280                    input,
1281                    metadata,
1282                    op_name: extract_op_name(self.print_root()),
1283                    node_type: HydroNodeType::Aggregation,
1284                },
1285                f,
1286            ),
1287
1288            // Join-like operations with left/right edge labels - grouped by edge labeling
1289            HydroNode::Join {
1290                left,
1291                right,
1292                metadata,
1293            }
1294            | HydroNode::CrossProduct {
1295                left,
1296                right,
1297                metadata,
1298            }
1299            | HydroNode::CrossSingleton {
1300                left,
1301                right,
1302                metadata,
1303            } => {
1304                let left_id = left.build_graph_structure(structure, seen_tees, config);
1305                let right_id = right.build_graph_structure(structure, seen_tees, config);
1306                let node_id = structure.add_node_with_metadata(
1307                    NodeLabel::Static(extract_op_name(self.print_root())),
1308                    HydroNodeType::Join,
1309                    metadata,
1310                );
1311
1312                // Extract semantic tags for left edge
1313                let left_metadata = left.metadata();
1314                add_edge_with_metadata(
1315                    structure,
1316                    left_id,
1317                    node_id,
1318                    Some(left_metadata),
1319                    Some(metadata),
1320                    Some("left".to_owned()),
1321                );
1322
1323                // Extract semantic tags for right edge
1324                let right_metadata = right.metadata();
1325                add_edge_with_metadata(
1326                    structure,
1327                    right_id,
1328                    node_id,
1329                    Some(right_metadata),
1330                    Some(metadata),
1331                    Some("right".to_owned()),
1332                );
1333
1334                node_id
1335            }
1336
1337            // Join-like operations with pos/neg edge labels - grouped by edge labeling
1338            HydroNode::Difference {
1339                pos: left,
1340                neg: right,
1341                metadata,
1342            }
1343            | HydroNode::AntiJoin {
1344                pos: left,
1345                neg: right,
1346                metadata,
1347            } => {
1348                let left_id = left.build_graph_structure(structure, seen_tees, config);
1349                let right_id = right.build_graph_structure(structure, seen_tees, config);
1350                let node_id = structure.add_node_with_metadata(
1351                    NodeLabel::Static(extract_op_name(self.print_root())),
1352                    HydroNodeType::Join,
1353                    metadata,
1354                );
1355
1356                // Extract semantic tags for pos edge
1357                let left_metadata = left.metadata();
1358                add_edge_with_metadata(
1359                    structure,
1360                    left_id,
1361                    node_id,
1362                    Some(left_metadata),
1363                    Some(metadata),
1364                    Some("pos".to_owned()),
1365                );
1366
1367                // Extract semantic tags for neg edge
1368                let right_metadata = right.metadata();
1369                add_edge_with_metadata(
1370                    structure,
1371                    right_id,
1372                    node_id,
1373                    Some(right_metadata),
1374                    Some(metadata),
1375                    Some("neg".to_owned()),
1376                );
1377
1378                node_id
1379            }
1380
1381            // Dual expression transforms - consolidated using pattern matching
1382            HydroNode::Fold {
1383                init,
1384                acc,
1385                input,
1386                metadata,
1387            }
1388            | HydroNode::FoldKeyed {
1389                init,
1390                acc,
1391                input,
1392                metadata,
1393            }
1394            | HydroNode::Scan {
1395                init,
1396                acc,
1397                input,
1398                metadata,
1399            } => {
1400                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
1401
1402                build_dual_expr_transform(
1403                    TransformParams {
1404                        structure,
1405                        seen_tees,
1406                        config,
1407                        input,
1408                        metadata,
1409                        op_name: extract_op_name(self.print_root()),
1410                        node_type,
1411                    },
1412                    init,
1413                    acc,
1414                )
1415            }
1416
1417            // Combination of join and transform
1418            HydroNode::ReduceKeyedWatermark {
1419                f,
1420                input,
1421                watermark,
1422                metadata,
1423            } => {
1424                let input_id = input.build_graph_structure(structure, seen_tees, config);
1425                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1426                let location_key = Some(setup_location(structure, metadata));
1427                let join_node_id = structure.add_node_with_backtrace(
1428                    NodeLabel::Static(extract_op_name(self.print_root())),
1429                    HydroNodeType::Join,
1430                    location_key,
1431                    Some(metadata.op.backtrace.clone()),
1432                );
1433
1434                // Extract semantic tags for input edge
1435                let input_metadata = input.metadata();
1436                add_edge_with_metadata(
1437                    structure,
1438                    input_id,
1439                    join_node_id,
1440                    Some(input_metadata),
1441                    Some(metadata),
1442                    Some("input".to_owned()),
1443                );
1444
1445                // Extract semantic tags for watermark edge
1446                let watermark_metadata = watermark.metadata();
1447                add_edge_with_metadata(
1448                    structure,
1449                    watermark_id,
1450                    join_node_id,
1451                    Some(watermark_metadata),
1452                    Some(metadata),
1453                    Some("watermark".to_owned()),
1454                );
1455
1456                let node_id = structure.add_node_with_backtrace(
1457                    NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
1458                    HydroNodeType::Aggregation,
1459                    location_key,
1460                    Some(metadata.op.backtrace.clone()),
1461                );
1462
1463                // Edge from join to aggregation node
1464                let join_metadata = metadata; // Use the same metadata
1465                add_edge_with_metadata(
1466                    structure,
1467                    join_node_id,
1468                    node_id,
1469                    Some(join_metadata),
1470                    Some(metadata),
1471                    None,
1472                );
1473
1474                node_id
1475            }
1476
1477            HydroNode::Network {
1478                serialize_fn,
1479                deserialize_fn,
1480                input,
1481                metadata,
1482                ..
1483            } => {
1484                let input_id = input.build_graph_structure(structure, seen_tees, config);
1485                let _from_location_key = setup_location(structure, metadata);
1486
1487                let root = metadata.location_id.root();
1488                let to_location_key = root.key();
1489                let to_location_type = root.location_type().unwrap();
1490                structure.add_location(to_location_key, to_location_type);
1491
1492                let mut label = "network(".to_owned();
1493                if serialize_fn.is_some() {
1494                    label.push_str("send");
1495                }
1496                if deserialize_fn.is_some() {
1497                    if serialize_fn.is_some() {
1498                        label.push_str(" + ");
1499                    }
1500                    label.push_str("recv");
1501                }
1502                label.push(')');
1503
1504                let network_id = structure.add_node_with_backtrace(
1505                    NodeLabel::Static(label),
1506                    HydroNodeType::Network,
1507                    Some(to_location_key),
1508                    Some(metadata.op.backtrace.clone()),
1509                );
1510
1511                // Extract semantic tags for network edge
1512                let input_metadata = input.metadata();
1513                add_edge_with_metadata(
1514                    structure,
1515                    input_id,
1516                    network_id,
1517                    Some(input_metadata),
1518                    Some(metadata),
1519                    Some(format!("to {:?}({})", to_location_type, to_location_key)),
1520                );
1521
1522                network_id
1523            }
1524
1525            // Non-deterministic batch operation
1526            HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1527                structure,
1528                seen_tees,
1529                config,
1530                input: inner,
1531                metadata,
1532                op_name: extract_op_name(self.print_root()),
1533                node_type: HydroNodeType::NonDeterministic,
1534            }),
1535
1536            HydroNode::YieldConcat { inner, .. } => {
1537                // Unpersist is typically optimized away, just pass through
1538                inner.build_graph_structure(structure, seen_tees, config)
1539            }
1540
1541            HydroNode::BeginAtomic { inner, .. } => {
1542                inner.build_graph_structure(structure, seen_tees, config)
1543            }
1544
1545            HydroNode::EndAtomic { inner, .. } => {
1546                inner.build_graph_structure(structure, seen_tees, config)
1547            }
1548
1549            HydroNode::Chain {
1550                first,
1551                second,
1552                metadata,
1553            } => {
1554                let first_id = first.build_graph_structure(structure, seen_tees, config);
1555                let second_id = second.build_graph_structure(structure, seen_tees, config);
1556                let location_key = Some(setup_location(structure, metadata));
1557                let chain_id = structure.add_node_with_backtrace(
1558                    NodeLabel::Static(extract_op_name(self.print_root())),
1559                    HydroNodeType::Transform,
1560                    location_key,
1561                    Some(metadata.op.backtrace.clone()),
1562                );
1563
1564                // Extract semantic tags for first edge
1565                let first_metadata = first.metadata();
1566                add_edge_with_metadata(
1567                    structure,
1568                    first_id,
1569                    chain_id,
1570                    Some(first_metadata),
1571                    Some(metadata),
1572                    Some("first".to_owned()),
1573                );
1574
1575                // Extract semantic tags for second edge
1576                let second_metadata = second.metadata();
1577                add_edge_with_metadata(
1578                    structure,
1579                    second_id,
1580                    chain_id,
1581                    Some(second_metadata),
1582                    Some(metadata),
1583                    Some("second".to_owned()),
1584                );
1585
1586                chain_id
1587            }
1588
1589            HydroNode::ChainFirst {
1590                first,
1591                second,
1592                metadata,
1593            } => {
1594                let first_id = first.build_graph_structure(structure, seen_tees, config);
1595                let second_id = second.build_graph_structure(structure, seen_tees, config);
1596                let location_key = Some(setup_location(structure, metadata));
1597                let chain_id = structure.add_node_with_backtrace(
1598                    NodeLabel::Static(extract_op_name(self.print_root())),
1599                    HydroNodeType::Transform,
1600                    location_key,
1601                    Some(metadata.op.backtrace.clone()),
1602                );
1603
1604                // Extract semantic tags for first edge
1605                let first_metadata = first.metadata();
1606                add_edge_with_metadata(
1607                    structure,
1608                    first_id,
1609                    chain_id,
1610                    Some(first_metadata),
1611                    Some(metadata),
1612                    Some("first".to_owned()),
1613                );
1614
1615                // Extract semantic tags for second edge
1616                let second_metadata = second.metadata();
1617                add_edge_with_metadata(
1618                    structure,
1619                    second_id,
1620                    chain_id,
1621                    Some(second_metadata),
1622                    Some(metadata),
1623                    Some("second".to_owned()),
1624                );
1625
1626                chain_id
1627            }
1628
1629            HydroNode::Counter {
1630                tag: _,
1631                prefix: _,
1632                duration,
1633                input,
1634                metadata,
1635            } => build_single_expr_transform(
1636                TransformParams {
1637                    structure,
1638                    seen_tees,
1639                    config,
1640                    input,
1641                    metadata,
1642                    op_name: extract_op_name(self.print_root()),
1643                    node_type: HydroNodeType::Transform,
1644                },
1645                duration,
1646            ),
1647        }
1648    }
1649}
1650
1651/// Utility functions for rendering multiple roots as a single graph.
1652/// Macro to reduce duplication in render functions.
1653macro_rules! render_hydro_ir {
1654    ($name:ident, $write_fn:ident) => {
1655        pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1656            let mut output = String::new();
1657            $write_fn(&mut output, roots, config).unwrap();
1658            output
1659        }
1660    };
1661}
1662
1663/// Macro to reduce duplication in write functions.
1664macro_rules! write_hydro_ir {
1665    ($name:ident, $writer_type:ty, $constructor:expr) => {
1666        pub fn $name(
1667            output: impl std::fmt::Write,
1668            roots: &[HydroRoot],
1669            config: HydroWriteConfig<'_>,
1670        ) -> std::fmt::Result {
1671            let mut graph_write: $writer_type = $constructor(output, config);
1672            write_hydro_ir_graph(&mut graph_write, roots, config)
1673        }
1674    };
1675}
1676
1677render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1678write_hydro_ir!(
1679    write_hydro_ir_mermaid,
1680    HydroMermaid<_>,
1681    HydroMermaid::new_with_config
1682);
1683
1684render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1685write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1686
1687// Legacy hydroscope function - now uses HydroJson for consistency
1688render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1689
1690// JSON rendering
1691render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1692write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1693
1694fn write_hydro_ir_graph<W>(
1695    graph_write: W,
1696    roots: &[HydroRoot],
1697    config: HydroWriteConfig<'_>,
1698) -> Result<(), W::Err>
1699where
1700    W: HydroGraphWrite,
1701{
1702    let mut structure = HydroGraphStructure::new();
1703    let mut seen_tees = HashMap::new();
1704
1705    // Build the graph structure for all roots
1706    for leaf in roots {
1707        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1708    }
1709
1710    write_graph_structure(&structure, graph_write, config)
1711}