Skip to main content

hydro_lang/viz/
render.rs

1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12// Re-export specific implementations
13pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19/// Label for a graph node - can be either a static string or contain expressions.
20#[derive(Debug, Clone)]
21pub enum NodeLabel {
22    /// A static string label
23    Static(String),
24    /// A label with an operation name and expression arguments
25    WithExprs {
26        op_name: String,
27        exprs: Vec<DebugExpr>,
28    },
29}
30
31impl NodeLabel {
32    /// Create a static label
33    pub fn static_label(s: String) -> Self {
34        Self::Static(s)
35    }
36
37    /// Create a label for an operation with multiple expression
38    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39        Self::WithExprs { op_name, exprs }
40    }
41}
42
43impl Display for NodeLabel {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Static(s) => write!(f, "{}", s),
47            Self::WithExprs { op_name, exprs } => {
48                if exprs.is_empty() {
49                    write!(f, "{}()", op_name)
50                } else {
51                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52                    write!(f, "{}({})", op_name, expr_strs.join(", "))
53                }
54            }
55        }
56    }
57}
58
59/// Base struct for text-based graph writers that use indentation.
60/// Contains common fields shared by DOT and Mermaid writers.
61pub struct IndentedGraphWriter<'a, W> {
62    pub write: W,
63    pub indent: usize,
64    pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68    /// Create a new writer with default configuration.
69    pub fn new(write: W) -> Self {
70        Self {
71            write,
72            indent: 0,
73            config: HydroWriteConfig::default(),
74        }
75    }
76
77    /// Create a new writer with the given configuration.
78    pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79        Self {
80            write,
81            indent: 0,
82            config,
83        }
84    }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88    /// Write an indented line using the current indentation level.
89    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91    }
92}
93
94/// Common error type used by all graph writers.
95pub type GraphWriteError = std::fmt::Error;
96
97/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
98#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100    /// Error type emitted by writing.
101    type Err: Error;
102
103    /// Begin the graph. First method called.
104    fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106    /// Write a node definition with styling.
107    fn write_node_definition(
108        &mut self,
109        node_id: VizNodeKey,
110        node_label: &NodeLabel,
111        node_type: HydroNodeType,
112        location_key: Option<LocationKey>,
113        location_type: Option<LocationType>,
114        backtrace: Option<&Backtrace>,
115    ) -> Result<(), Self::Err>;
116
117    /// Write an edge between nodes with optional labeling.
118    fn write_edge(
119        &mut self,
120        src_id: VizNodeKey,
121        dst_id: VizNodeKey,
122        edge_properties: &HashSet<HydroEdgeProp>,
123        label: Option<&str>,
124    ) -> Result<(), Self::Err>;
125
126    /// Begin writing a location grouping (process/cluster).
127    fn write_location_start(
128        &mut self,
129        location_key: LocationKey,
130        location_type: LocationType,
131    ) -> Result<(), Self::Err>;
132
133    /// Write a node within a location.
134    fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136    /// End writing a location grouping.
137    fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139    /// End the graph. Last method called.
140    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143/// Node type utilities - centralized handling of HydroNodeType operations
144pub mod node_type_utils {
145    use super::HydroNodeType;
146
147    /// All node types with their string names
148    const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149        (HydroNodeType::Source, "Source"),
150        (HydroNodeType::Transform, "Transform"),
151        (HydroNodeType::Join, "Join"),
152        (HydroNodeType::Aggregation, "Aggregation"),
153        (HydroNodeType::Network, "Network"),
154        (HydroNodeType::Sink, "Sink"),
155        (HydroNodeType::Tee, "Tee"),
156        (HydroNodeType::NonDeterministic, "NonDeterministic"),
157    ];
158
159    /// Convert HydroNodeType to string representation (used by JSON format)
160    pub fn to_string(node_type: HydroNodeType) -> &'static str {
161        NODE_TYPE_DATA
162            .iter()
163            .find(|(nt, _)| *nt == node_type)
164            .map(|(_, name)| *name)
165            .unwrap_or("Unknown")
166    }
167
168    /// Get all node types with their string representations (used by JSON format)
169    pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170        NODE_TYPE_DATA.to_vec()
171    }
172}
173
174/// Types of nodes in Hydro IR for styling purposes.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177    Source,
178    Transform,
179    Join,
180    Aggregation,
181    Network,
182    Sink,
183    Tee,
184    NonDeterministic,
185}
186
187/// Types of edges in Hydro IR representing stream properties.
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190    Bounded,
191    Unbounded,
192    TotalOrder,
193    NoOrder,
194    Keyed,
195    // Collection type tags for styling
196    Stream,
197    KeyedSingleton,
198    KeyedStream,
199    Singleton,
200    Optional,
201    Network,
202    Cycle,
203}
204
205/// Unified edge style representation for all graph formats.
206/// This intermediate format allows consistent styling across JSON, DOT, and Mermaid.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209    /// Line pattern (solid, dashed)
210    pub line_pattern: LinePattern,
211    /// Line width (1 = thin, 3 = thick)
212    pub line_width: u8,
213    /// Arrowhead style
214    pub arrowhead: ArrowheadStyle,
215    /// Line style (single plain line, or line with hash marks/dots for keyed streams)
216    pub line_style: LineStyle,
217    /// Halo/background effect for boundedness
218    pub halo: HaloStyle,
219    /// Line waviness for ordering information
220    pub waviness: WavinessStyle,
221    /// Whether animation is enabled (JSON only)
222    pub animation: AnimationStyle,
223    /// Color for the edge
224    pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229    Solid,
230    Dotted,
231    Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236    TriangleFilled,
237    CircleFilled,
238    DiamondOpen,
239    Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244    /// Plain single line
245    Single,
246    /// Single line with hash marks/dots (for keyed streams)
247    HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252    None,
253    LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258    None,
259    Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264    Static,
265    Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269    fn default() -> Self {
270        Self {
271            line_pattern: LinePattern::Solid,
272            line_width: 1,
273            arrowhead: ArrowheadStyle::Default,
274            line_style: LineStyle::Single,
275            halo: HaloStyle::None,
276            waviness: WavinessStyle::None,
277            animation: AnimationStyle::Static,
278            color: "#666666",
279        }
280    }
281}
282
283/// Convert HydroEdgeType properties to unified edge style.
284/// This is the core logic for determining edge visual properties.
285///
286/// # Visual Encoding Mapping
287///
288/// | Semantic Property | Visual Channel | Values |
289/// |------------------|----------------|---------|
290/// | Network | Line Pattern + Animation | Local (solid, static), Network (dashed, animated) |
291/// | Ordering | Waviness | TotalOrder (straight), NoOrder (wavy) |
292/// | Boundedness | Halo | Bounded (none), Unbounded (light-blue transparent) |
293/// | Keyedness | Line Style | NotKeyed (plain line), Keyed (line with hash marks/dots) |
294/// | Collection Type | Color + Arrowhead | Stream (blue #2563eb, triangle), Singleton (black, circle), Optional (gray, diamond) |
295pub fn get_unified_edge_style(
296    edge_properties: &HashSet<HydroEdgeProp>,
297    src_location: Option<usize>,
298    dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300    let mut style = UnifiedEdgeStyle::default();
301
302    // Network communication group - controls line pattern AND animation
303    let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304        || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306    if is_network {
307        style.line_pattern = LinePattern::Dashed;
308        style.animation = AnimationStyle::Animated;
309    } else {
310        style.line_pattern = LinePattern::Solid;
311        style.animation = AnimationStyle::Static;
312    }
313
314    // Boundedness group - controls halo
315    if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316        style.halo = HaloStyle::LightBlue;
317    } else {
318        style.halo = HaloStyle::None;
319    }
320
321    // Collection type group - controls arrowhead and color
322    if edge_properties.contains(&HydroEdgeProp::Stream) {
323        style.arrowhead = ArrowheadStyle::TriangleFilled;
324        style.color = "#2563eb"; // Bright blue for Stream
325    } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326        style.arrowhead = ArrowheadStyle::TriangleFilled;
327        style.color = "#2563eb"; // Bright blue for Stream (keyed variant)
328    } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329        style.arrowhead = ArrowheadStyle::TriangleFilled;
330        style.color = "#000000"; // Black for Singleton (keyed variant)
331    } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332        style.arrowhead = ArrowheadStyle::CircleFilled;
333        style.color = "#000000"; // Black for Singleton
334    } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335        style.arrowhead = ArrowheadStyle::DiamondOpen;
336        style.color = "#6b7280"; // Gray for Optional
337    }
338
339    // Keyedness group - controls hash marks on the line
340    if edge_properties.contains(&HydroEdgeProp::Keyed) {
341        style.line_style = LineStyle::HashMarks; // Renders as hash marks/dots on the line in hydroscope
342    } else {
343        style.line_style = LineStyle::Single;
344    }
345
346    // Ordering group - waviness channel
347    if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348        style.waviness = WavinessStyle::Wavy;
349    } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350        style.waviness = WavinessStyle::None;
351    }
352
353    style
354}
355
356/// Extract semantic edge properties from CollectionKind metadata.
357/// This function analyzes the collection type and extracts relevant semantic tags
358/// for visualization purposes.
359pub fn extract_edge_properties_from_collection_kind(
360    collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362    use crate::compile::ir::CollectionKind;
363
364    let mut properties = HashSet::new();
365
366    match collection_kind {
367        CollectionKind::Stream { bound, order, .. } => {
368            properties.insert(HydroEdgeProp::Stream);
369            add_bound_property(&mut properties, bound);
370            add_order_property(&mut properties, order);
371        }
372        CollectionKind::KeyedStream {
373            bound, value_order, ..
374        } => {
375            properties.insert(HydroEdgeProp::KeyedStream);
376            properties.insert(HydroEdgeProp::Keyed);
377            add_bound_property(&mut properties, bound);
378            add_order_property(&mut properties, value_order);
379        }
380        CollectionKind::Singleton { bound, .. } => {
381            properties.insert(HydroEdgeProp::Singleton);
382            add_bound_property(&mut properties, bound);
383            // Singletons have implicit TotalOrder
384            properties.insert(HydroEdgeProp::TotalOrder);
385        }
386        CollectionKind::Optional { bound, .. } => {
387            properties.insert(HydroEdgeProp::Optional);
388            add_bound_property(&mut properties, bound);
389            // Optionals have implicit TotalOrder
390            properties.insert(HydroEdgeProp::TotalOrder);
391        }
392        CollectionKind::KeyedSingleton { bound, .. } => {
393            properties.insert(HydroEdgeProp::Singleton);
394            properties.insert(HydroEdgeProp::Keyed);
395            // KeyedSingletons boundedness depends on the bound kind
396            add_keyed_singleton_bound_property(&mut properties, bound);
397            properties.insert(HydroEdgeProp::TotalOrder);
398        }
399    }
400
401    properties
402}
403
404/// Helper function to add bound property based on BoundKind.
405fn add_bound_property(
406    properties: &mut HashSet<HydroEdgeProp>,
407    bound: &crate::compile::ir::BoundKind,
408) {
409    use crate::compile::ir::BoundKind;
410
411    match bound {
412        BoundKind::Bounded => {
413            properties.insert(HydroEdgeProp::Bounded);
414        }
415        BoundKind::Unbounded => {
416            properties.insert(HydroEdgeProp::Unbounded);
417        }
418    }
419}
420
421/// Helper function to add bound property for KeyedSingleton based on KeyedSingletonBoundKind.
422fn add_keyed_singleton_bound_property(
423    properties: &mut HashSet<HydroEdgeProp>,
424    bound: &crate::compile::ir::KeyedSingletonBoundKind,
425) {
426    use crate::compile::ir::KeyedSingletonBoundKind;
427
428    match bound {
429        KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
430            properties.insert(HydroEdgeProp::Bounded);
431        }
432        KeyedSingletonBoundKind::Unbounded => {
433            properties.insert(HydroEdgeProp::Unbounded);
434        }
435    }
436}
437
438/// Helper function to add order property based on StreamOrder.
439fn add_order_property(
440    properties: &mut HashSet<HydroEdgeProp>,
441    order: &crate::compile::ir::StreamOrder,
442) {
443    use crate::compile::ir::StreamOrder;
444
445    match order {
446        StreamOrder::TotalOrder => {
447            properties.insert(HydroEdgeProp::TotalOrder);
448        }
449        StreamOrder::NoOrder => {
450            properties.insert(HydroEdgeProp::NoOrder);
451        }
452    }
453}
454
455/// Detect if an edge crosses network boundaries by comparing source and destination locations.
456/// Returns true if the edge represents network communication between different locations.
457pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
458    // Compare the root locations to determine if they differ
459    src_location.root() != dst_location.root()
460}
461
462/// Add network edge tag if source and destination locations differ.
463pub fn add_network_edge_tag(
464    properties: &mut HashSet<HydroEdgeProp>,
465    src_location: &LocationId,
466    dst_location: &LocationId,
467) {
468    if is_network_edge(src_location, dst_location) {
469        properties.insert(HydroEdgeProp::Network);
470    }
471}
472
473/// Configuration for graph writing.
474#[derive(Debug, Clone, Copy)]
475pub struct HydroWriteConfig<'a> {
476    pub show_metadata: bool,
477    pub show_location_groups: bool,
478    pub use_short_labels: bool,
479    pub location_names: &'a SecondaryMap<LocationKey, String>,
480}
481
482impl Default for HydroWriteConfig<'_> {
483    fn default() -> Self {
484        static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
485        Self {
486            show_metadata: false,
487            show_location_groups: true,
488            use_short_labels: true, // Default to short labels for all renderers
489            location_names: EMPTY.get_or_init(SecondaryMap::new),
490        }
491    }
492}
493
494/// Node information in the Hydro graph.
495#[derive(Clone)]
496pub struct HydroGraphNode {
497    pub label: NodeLabel,
498    pub node_type: HydroNodeType,
499    pub location_key: Option<LocationKey>,
500    pub backtrace: Option<Backtrace>,
501}
502
503slotmap::new_key_type! {
504    /// Unique identifier for nodes in the visualization graph.
505    ///
506    /// This is counted/allocated separately from any other IDs within `hydro_lang`.
507    pub struct VizNodeKey;
508}
509
510impl Display for VizNodeKey {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        write!(f, "viz{:?}", self.data()) // `"viz1v1"``
513    }
514}
515
516/// This is used by the visualizer
517/// TODO(mingwei): Make this more robust?
518impl std::str::FromStr for VizNodeKey {
519    type Err = Option<ParseIntError>;
520
521    fn from_str(s: &str) -> Result<Self, Self::Err> {
522        let nvn = s.strip_prefix("viz").ok_or(None)?;
523        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
524        let idx: u64 = idx.parse()?;
525        let ver: u64 = ver.parse()?;
526        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
527    }
528}
529
530impl VizNodeKey {
531    /// A key for testing with index 1.
532    #[cfg(test)]
533    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000001)); // `1v143`
534
535    /// A key for testing with index 2.
536    #[cfg(test)]
537    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000002)); // `2v143`
538}
539
540/// Edge information in the Hydro graph.
541#[derive(Debug, Clone)]
542pub struct HydroGraphEdge {
543    pub src: VizNodeKey,
544    pub dst: VizNodeKey,
545    pub edge_properties: HashSet<HydroEdgeProp>,
546    pub label: Option<String>,
547}
548
549/// Graph structure tracker for Hydro IR rendering.
550#[derive(Default)]
551pub struct HydroGraphStructure {
552    pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
553    pub edges: Vec<HydroGraphEdge>,
554    pub locations: SecondaryMap<LocationKey, LocationType>,
555}
556
557impl HydroGraphStructure {
558    pub fn new() -> Self {
559        Self::default()
560    }
561
562    pub fn add_node(
563        &mut self,
564        label: NodeLabel,
565        node_type: HydroNodeType,
566        location_key: Option<LocationKey>,
567    ) -> VizNodeKey {
568        self.add_node_with_backtrace(label, node_type, location_key, None)
569    }
570
571    pub fn add_node_with_backtrace(
572        &mut self,
573        label: NodeLabel,
574        node_type: HydroNodeType,
575        location_key: Option<LocationKey>,
576        backtrace: Option<Backtrace>,
577    ) -> VizNodeKey {
578        self.nodes.insert(HydroGraphNode {
579            label,
580            node_type,
581            location_key,
582            backtrace,
583        })
584    }
585
586    /// Add a node with metadata, extracting backtrace automatically
587    pub fn add_node_with_metadata(
588        &mut self,
589        label: NodeLabel,
590        node_type: HydroNodeType,
591        metadata: &HydroIrMetadata,
592    ) -> VizNodeKey {
593        let location_key = Some(setup_location(self, metadata));
594        let backtrace = Some(metadata.op.backtrace.clone());
595        self.add_node_with_backtrace(label, node_type, location_key, backtrace)
596    }
597
598    pub fn add_edge(
599        &mut self,
600        src: VizNodeKey,
601        dst: VizNodeKey,
602        edge_properties: HashSet<HydroEdgeProp>,
603        label: Option<String>,
604    ) {
605        self.edges.push(HydroGraphEdge {
606            src,
607            dst,
608            edge_properties,
609            label,
610        });
611    }
612
613    // Legacy method for backward compatibility
614    pub fn add_edge_single(
615        &mut self,
616        src: VizNodeKey,
617        dst: VizNodeKey,
618        edge_type: HydroEdgeProp,
619        label: Option<String>,
620    ) {
621        let mut properties = HashSet::new();
622        properties.insert(edge_type);
623        self.edges.push(HydroGraphEdge {
624            src,
625            dst,
626            edge_properties: properties,
627            label,
628        });
629    }
630
631    pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
632        self.locations.insert(location_key, location_type);
633    }
634}
635
636/// Function to extract an op_name from a print_root() result for use in labels.
637pub fn extract_op_name(full_label: String) -> String {
638    full_label
639        .split('(')
640        .next()
641        .unwrap_or("unknown")
642        .to_lowercase()
643}
644
645/// Extract a short, readable label from the full token stream label using print_root() style naming
646pub fn extract_short_label(full_label: &str) -> String {
647    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
648    if let Some(op_name) = full_label.split('(').next() {
649        let base_name = op_name.to_lowercase();
650        match base_name.as_str() {
651            // Handle special cases for UI display
652            "source" => {
653                if full_label.contains("Iter") {
654                    "source_iter".to_owned()
655                } else if full_label.contains("Stream") {
656                    "source_stream".to_owned()
657                } else if full_label.contains("ExternalNetwork") {
658                    "external_network".to_owned()
659                } else if full_label.contains("Spin") {
660                    "spin".to_owned()
661                } else {
662                    "source".to_owned()
663                }
664            }
665            "network" => {
666                if full_label.contains("deser") {
667                    "network(recv)".to_owned()
668                } else if full_label.contains("ser") {
669                    "network(send)".to_owned()
670                } else {
671                    "network".to_owned()
672                }
673            }
674            // For all other cases, just use the lowercase base name (same as extract_op_name)
675            _ => base_name,
676        }
677    } else {
678        // Fallback for labels that don't follow the pattern
679        if full_label.len() > 20 {
680            format!("{}...", &full_label[..17])
681        } else {
682            full_label.to_owned()
683        }
684    }
685}
686
687/// Helper function to set up location in structure from metadata.
688fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
689    let root = metadata.location_id.root();
690    let location_key = root.key();
691    let location_type = root.location_type().unwrap();
692    structure.add_location(location_key, location_type);
693    location_key
694}
695
696/// Helper function to add an edge with semantic tags extracted from metadata.
697/// This function combines collection kind extraction with network detection.
698fn add_edge_with_metadata(
699    structure: &mut HydroGraphStructure,
700    src_id: VizNodeKey,
701    dst_id: VizNodeKey,
702    src_metadata: Option<&HydroIrMetadata>,
703    dst_metadata: Option<&HydroIrMetadata>,
704    label: Option<String>,
705) {
706    let mut properties = HashSet::new();
707
708    // Extract semantic tags from source metadata's collection kind
709    if let Some(metadata) = src_metadata {
710        properties.extend(extract_edge_properties_from_collection_kind(
711            &metadata.collection_kind,
712        ));
713    }
714
715    // Add network edge tag if locations differ
716    if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
717        add_network_edge_tag(
718            &mut properties,
719            &src_meta.location_id,
720            &dst_meta.location_id,
721        );
722    }
723
724    // If no properties were extracted, default to Stream
725    if properties.is_empty() {
726        properties.insert(HydroEdgeProp::Stream);
727    }
728
729    structure.add_edge(src_id, dst_id, properties, label);
730}
731
732/// Helper function to write a graph structure using any GraphWrite implementation
733fn write_graph_structure<W>(
734    structure: &HydroGraphStructure,
735    graph_write: W,
736    config: HydroWriteConfig<'_>,
737) -> Result<(), W::Err>
738where
739    W: HydroGraphWrite,
740{
741    let mut graph_write = graph_write;
742    // Write the graph
743    graph_write.write_prologue()?;
744
745    // Write node definitions
746    for (node_id, node) in structure.nodes.iter() {
747        let location_type = node
748            .location_key
749            .and_then(|loc_key| structure.locations.get(loc_key))
750            .copied();
751
752        graph_write.write_node_definition(
753            node_id,
754            &node.label,
755            node.node_type,
756            node.location_key,
757            location_type,
758            node.backtrace.as_ref(),
759        )?;
760    }
761
762    // Group nodes by location if requested
763    if config.show_location_groups {
764        let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
765        for (node_id, node) in structure.nodes.iter() {
766            if let Some(location_key) = node.location_key {
767                nodes_by_location
768                    .entry(location_key)
769                    .expect("location was removed")
770                    .or_default()
771                    .push(node_id);
772            }
773        }
774
775        for (location_key, node_ids) in nodes_by_location.iter() {
776            if let Some(&location_type) = structure.locations.get(location_key) {
777                graph_write.write_location_start(location_key, location_type)?;
778                for &node_id in node_ids.iter() {
779                    graph_write.write_node(node_id)?;
780                }
781                graph_write.write_location_end()?;
782            }
783        }
784    }
785
786    // Write edges
787    for edge in structure.edges.iter() {
788        graph_write.write_edge(
789            edge.src,
790            edge.dst,
791            &edge.edge_properties,
792            edge.label.as_deref(),
793        )?;
794    }
795
796    graph_write.write_epilogue()?;
797    Ok(())
798}
799
800impl HydroRoot {
801    /// Build the graph structure by traversing the IR tree.
802    pub fn build_graph_structure(
803        &self,
804        structure: &mut HydroGraphStructure,
805        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
806        config: HydroWriteConfig<'_>,
807    ) -> VizNodeKey {
808        // Helper function for sink nodes to reduce duplication
809        fn build_sink_node(
810            structure: &mut HydroGraphStructure,
811            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
812            config: HydroWriteConfig<'_>,
813            input: &HydroNode,
814            sink_metadata: Option<&HydroIrMetadata>,
815            label: NodeLabel,
816        ) -> VizNodeKey {
817            let input_id = input.build_graph_structure(structure, seen_tees, config);
818
819            // If no explicit metadata is provided, extract it from the input node
820            let effective_metadata = if let Some(meta) = sink_metadata {
821                Some(meta)
822            } else {
823                match input {
824                    HydroNode::Placeholder => None,
825                    // All other variants have metadata
826                    _ => Some(input.metadata()),
827                }
828            };
829
830            let location_key = effective_metadata.map(|m| setup_location(structure, m));
831            let sink_id = structure.add_node_with_backtrace(
832                label,
833                HydroNodeType::Sink,
834                location_key,
835                effective_metadata.map(|m| m.op.backtrace.clone()),
836            );
837
838            // Extract semantic tags from input metadata
839            let input_metadata = input.metadata();
840            add_edge_with_metadata(
841                structure,
842                input_id,
843                sink_id,
844                Some(input_metadata),
845                sink_metadata,
846                None,
847            );
848
849            sink_id
850        }
851
852        match self {
853            // Sink operations - semantic tags extracted from input metadata
854            HydroRoot::ForEach { f, input, .. } => build_sink_node(
855                structure,
856                seen_tees,
857                config,
858                input,
859                None,
860                NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
861            ),
862
863            HydroRoot::SendExternal {
864                to_external_key,
865                to_port_id,
866                input,
867                ..
868            } => build_sink_node(
869                structure,
870                seen_tees,
871                config,
872                input,
873                None,
874                NodeLabel::with_exprs(
875                    format!("send_external({}:{})", to_external_key, to_port_id),
876                    vec![],
877                ),
878            ),
879
880            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
881                structure,
882                seen_tees,
883                config,
884                input,
885                None,
886                NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
887            ),
888
889            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
890                structure,
891                seen_tees,
892                config,
893                input,
894                None,
895                NodeLabel::static_label(format!("cycle_sink({})", ident)),
896            ),
897
898            HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
899                structure,
900                seen_tees,
901                config,
902                input,
903                None,
904                NodeLabel::static_label(format!("embedded_output({})", ident)),
905            ),
906        }
907    }
908}
909
910impl HydroNode {
911    /// Build the graph structure recursively for this node.
912    pub fn build_graph_structure(
913        &self,
914        structure: &mut HydroGraphStructure,
915        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
916        config: HydroWriteConfig<'_>,
917    ) -> VizNodeKey {
918        // Helper functions to reduce duplication, categorized by input/expression patterns
919
920        /// Common parameters for transform builder functions to reduce argument count
921        struct TransformParams<'a> {
922            structure: &'a mut HydroGraphStructure,
923            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
924            config: HydroWriteConfig<'a>,
925            input: &'a HydroNode,
926            metadata: &'a HydroIrMetadata,
927            op_name: String,
928            node_type: HydroNodeType,
929        }
930
931        // Single-input transform with no expressions
932        fn build_simple_transform(params: TransformParams) -> VizNodeKey {
933            let input_id = params.input.build_graph_structure(
934                params.structure,
935                params.seen_tees,
936                params.config,
937            );
938            let node_id = params.structure.add_node_with_metadata(
939                NodeLabel::Static(params.op_name.to_string()),
940                params.node_type,
941                params.metadata,
942            );
943
944            // Extract semantic tags from input metadata
945            let input_metadata = params.input.metadata();
946            add_edge_with_metadata(
947                params.structure,
948                input_id,
949                node_id,
950                Some(input_metadata),
951                Some(params.metadata),
952                None,
953            );
954
955            node_id
956        }
957
958        // Single-input transform with one expression
959        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
960            let input_id = params.input.build_graph_structure(
961                params.structure,
962                params.seen_tees,
963                params.config,
964            );
965            let node_id = params.structure.add_node_with_metadata(
966                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
967                params.node_type,
968                params.metadata,
969            );
970
971            // Extract semantic tags from input metadata
972            let input_metadata = params.input.metadata();
973            add_edge_with_metadata(
974                params.structure,
975                input_id,
976                node_id,
977                Some(input_metadata),
978                Some(params.metadata),
979                None,
980            );
981
982            node_id
983        }
984
985        // Single-input transform with two expressions
986        fn build_dual_expr_transform(
987            params: TransformParams,
988            expr1: &DebugExpr,
989            expr2: &DebugExpr,
990        ) -> VizNodeKey {
991            let input_id = params.input.build_graph_structure(
992                params.structure,
993                params.seen_tees,
994                params.config,
995            );
996            let node_id = params.structure.add_node_with_metadata(
997                NodeLabel::with_exprs(
998                    params.op_name.to_string(),
999                    vec![expr1.clone(), expr2.clone()],
1000                ),
1001                params.node_type,
1002                params.metadata,
1003            );
1004
1005            // Extract semantic tags from input metadata
1006            let input_metadata = params.input.metadata();
1007            add_edge_with_metadata(
1008                params.structure,
1009                input_id,
1010                node_id,
1011                Some(input_metadata),
1012                Some(params.metadata),
1013                None,
1014            );
1015
1016            node_id
1017        }
1018
1019        // Helper function for source nodes
1020        fn build_source_node(
1021            structure: &mut HydroGraphStructure,
1022            metadata: &HydroIrMetadata,
1023            label: String,
1024        ) -> VizNodeKey {
1025            structure.add_node_with_metadata(
1026                NodeLabel::Static(label),
1027                HydroNodeType::Source,
1028                metadata,
1029            )
1030        }
1031
1032        match self {
1033            HydroNode::Placeholder => structure.add_node(
1034                NodeLabel::Static("PLACEHOLDER".to_owned()),
1035                HydroNodeType::Transform,
1036                None,
1037            ),
1038
1039            HydroNode::Source {
1040                source, metadata, ..
1041            } => {
1042                let label = match source {
1043                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
1044                    HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1045                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
1046                    HydroSource::Spin() => "spin()".to_owned(),
1047                    HydroSource::ClusterMembers(location_id) => {
1048                        format!(
1049                            "source_stream(cluster_membership_stream({:?}))",
1050                            location_id
1051                        )
1052                    }
1053                    HydroSource::Embedded(ident) => {
1054                        format!("embedded_input({})", ident)
1055                    }
1056                };
1057                build_source_node(structure, metadata, label)
1058            }
1059
1060            HydroNode::SingletonSource { value, metadata } => {
1061                let label = format!("singleton({})", value);
1062                build_source_node(structure, metadata, label)
1063            }
1064
1065            HydroNode::ExternalInput {
1066                from_external_key,
1067                from_port_id,
1068                metadata,
1069                ..
1070            } => build_source_node(
1071                structure,
1072                metadata,
1073                format!("external_input({}:{})", from_external_key, from_port_id),
1074            ),
1075
1076            HydroNode::CycleSource {
1077                ident, metadata, ..
1078            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1079
1080            HydroNode::Tee { inner, metadata } => {
1081                let ptr = inner.as_ptr();
1082                if let Some(&existing_id) = seen_tees.get(&ptr) {
1083                    return existing_id;
1084                }
1085
1086                let input_id = inner
1087                    .0
1088                    .borrow()
1089                    .build_graph_structure(structure, seen_tees, config);
1090                let tee_id = structure.add_node_with_metadata(
1091                    NodeLabel::Static(extract_op_name(self.print_root())),
1092                    HydroNodeType::Tee,
1093                    metadata,
1094                );
1095
1096                seen_tees.insert(ptr, tee_id);
1097
1098                // Extract semantic tags from input
1099                let inner_borrow = inner.0.borrow();
1100                let input_metadata = inner_borrow.metadata();
1101                add_edge_with_metadata(
1102                    structure,
1103                    input_id,
1104                    tee_id,
1105                    Some(input_metadata),
1106                    Some(metadata),
1107                    None,
1108                );
1109                drop(inner_borrow);
1110
1111                tee_id
1112            }
1113
1114            // Non-deterministic operation
1115            HydroNode::ObserveNonDet {
1116                inner, metadata, ..
1117            } => build_simple_transform(TransformParams {
1118                structure,
1119                seen_tees,
1120                config,
1121                input: inner,
1122                metadata,
1123                op_name: extract_op_name(self.print_root()),
1124                node_type: HydroNodeType::NonDeterministic,
1125            }),
1126
1127            // Transform operations with Stream edges - grouped by node/edge type
1128            HydroNode::Cast { inner, metadata }
1129            | HydroNode::DeferTick {
1130                input: inner,
1131                metadata,
1132            }
1133            | HydroNode::Enumerate {
1134                input: inner,
1135                metadata,
1136                ..
1137            }
1138            | HydroNode::Unique {
1139                input: inner,
1140                metadata,
1141            }
1142            | HydroNode::ResolveFutures {
1143                input: inner,
1144                metadata,
1145            }
1146            | HydroNode::ResolveFuturesOrdered {
1147                input: inner,
1148                metadata,
1149            } => build_simple_transform(TransformParams {
1150                structure,
1151                seen_tees,
1152                config,
1153                input: inner,
1154                metadata,
1155                op_name: extract_op_name(self.print_root()),
1156                node_type: HydroNodeType::Transform,
1157            }),
1158
1159            // Aggregation operation - semantic tags extracted from metadata
1160            HydroNode::Sort {
1161                input: inner,
1162                metadata,
1163            } => build_simple_transform(TransformParams {
1164                structure,
1165                seen_tees,
1166                config,
1167                input: inner,
1168                metadata,
1169                op_name: extract_op_name(self.print_root()),
1170                node_type: HydroNodeType::Aggregation,
1171            }),
1172
1173            // Single-expression Transform operations - grouped by node type
1174            HydroNode::Map { f, input, metadata }
1175            | HydroNode::Filter { f, input, metadata }
1176            | HydroNode::FlatMap { f, input, metadata }
1177            | HydroNode::FilterMap { f, input, metadata }
1178            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1179                TransformParams {
1180                    structure,
1181                    seen_tees,
1182                    config,
1183                    input,
1184                    metadata,
1185                    op_name: extract_op_name(self.print_root()),
1186                    node_type: HydroNodeType::Transform,
1187                },
1188                f,
1189            ),
1190
1191            // Single-expression Aggregation operations - grouped by node type
1192            HydroNode::Reduce { f, input, metadata }
1193            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1194                TransformParams {
1195                    structure,
1196                    seen_tees,
1197                    config,
1198                    input,
1199                    metadata,
1200                    op_name: extract_op_name(self.print_root()),
1201                    node_type: HydroNodeType::Aggregation,
1202                },
1203                f,
1204            ),
1205
1206            // Join-like operations with left/right edge labels - grouped by edge labeling
1207            HydroNode::Join {
1208                left,
1209                right,
1210                metadata,
1211            }
1212            | HydroNode::CrossProduct {
1213                left,
1214                right,
1215                metadata,
1216            }
1217            | HydroNode::CrossSingleton {
1218                left,
1219                right,
1220                metadata,
1221            } => {
1222                let left_id = left.build_graph_structure(structure, seen_tees, config);
1223                let right_id = right.build_graph_structure(structure, seen_tees, config);
1224                let node_id = structure.add_node_with_metadata(
1225                    NodeLabel::Static(extract_op_name(self.print_root())),
1226                    HydroNodeType::Join,
1227                    metadata,
1228                );
1229
1230                // Extract semantic tags for left edge
1231                let left_metadata = left.metadata();
1232                add_edge_with_metadata(
1233                    structure,
1234                    left_id,
1235                    node_id,
1236                    Some(left_metadata),
1237                    Some(metadata),
1238                    Some("left".to_owned()),
1239                );
1240
1241                // Extract semantic tags for right edge
1242                let right_metadata = right.metadata();
1243                add_edge_with_metadata(
1244                    structure,
1245                    right_id,
1246                    node_id,
1247                    Some(right_metadata),
1248                    Some(metadata),
1249                    Some("right".to_owned()),
1250                );
1251
1252                node_id
1253            }
1254
1255            // Join-like operations with pos/neg edge labels - grouped by edge labeling
1256            HydroNode::Difference {
1257                pos: left,
1258                neg: right,
1259                metadata,
1260            }
1261            | HydroNode::AntiJoin {
1262                pos: left,
1263                neg: right,
1264                metadata,
1265            } => {
1266                let left_id = left.build_graph_structure(structure, seen_tees, config);
1267                let right_id = right.build_graph_structure(structure, seen_tees, config);
1268                let node_id = structure.add_node_with_metadata(
1269                    NodeLabel::Static(extract_op_name(self.print_root())),
1270                    HydroNodeType::Join,
1271                    metadata,
1272                );
1273
1274                // Extract semantic tags for pos edge
1275                let left_metadata = left.metadata();
1276                add_edge_with_metadata(
1277                    structure,
1278                    left_id,
1279                    node_id,
1280                    Some(left_metadata),
1281                    Some(metadata),
1282                    Some("pos".to_owned()),
1283                );
1284
1285                // Extract semantic tags for neg edge
1286                let right_metadata = right.metadata();
1287                add_edge_with_metadata(
1288                    structure,
1289                    right_id,
1290                    node_id,
1291                    Some(right_metadata),
1292                    Some(metadata),
1293                    Some("neg".to_owned()),
1294                );
1295
1296                node_id
1297            }
1298
1299            // Dual expression transforms - consolidated using pattern matching
1300            HydroNode::Fold {
1301                init,
1302                acc,
1303                input,
1304                metadata,
1305            }
1306            | HydroNode::FoldKeyed {
1307                init,
1308                acc,
1309                input,
1310                metadata,
1311            }
1312            | HydroNode::Scan {
1313                init,
1314                acc,
1315                input,
1316                metadata,
1317            } => {
1318                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
1319
1320                build_dual_expr_transform(
1321                    TransformParams {
1322                        structure,
1323                        seen_tees,
1324                        config,
1325                        input,
1326                        metadata,
1327                        op_name: extract_op_name(self.print_root()),
1328                        node_type,
1329                    },
1330                    init,
1331                    acc,
1332                )
1333            }
1334
1335            // Combination of join and transform
1336            HydroNode::ReduceKeyedWatermark {
1337                f,
1338                input,
1339                watermark,
1340                metadata,
1341            } => {
1342                let input_id = input.build_graph_structure(structure, seen_tees, config);
1343                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1344                let location_key = Some(setup_location(structure, metadata));
1345                let join_node_id = structure.add_node_with_backtrace(
1346                    NodeLabel::Static(extract_op_name(self.print_root())),
1347                    HydroNodeType::Join,
1348                    location_key,
1349                    Some(metadata.op.backtrace.clone()),
1350                );
1351
1352                // Extract semantic tags for input edge
1353                let input_metadata = input.metadata();
1354                add_edge_with_metadata(
1355                    structure,
1356                    input_id,
1357                    join_node_id,
1358                    Some(input_metadata),
1359                    Some(metadata),
1360                    Some("input".to_owned()),
1361                );
1362
1363                // Extract semantic tags for watermark edge
1364                let watermark_metadata = watermark.metadata();
1365                add_edge_with_metadata(
1366                    structure,
1367                    watermark_id,
1368                    join_node_id,
1369                    Some(watermark_metadata),
1370                    Some(metadata),
1371                    Some("watermark".to_owned()),
1372                );
1373
1374                let node_id = structure.add_node_with_backtrace(
1375                    NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
1376                    HydroNodeType::Aggregation,
1377                    location_key,
1378                    Some(metadata.op.backtrace.clone()),
1379                );
1380
1381                // Edge from join to aggregation node
1382                let join_metadata = metadata; // Use the same metadata
1383                add_edge_with_metadata(
1384                    structure,
1385                    join_node_id,
1386                    node_id,
1387                    Some(join_metadata),
1388                    Some(metadata),
1389                    None,
1390                );
1391
1392                node_id
1393            }
1394
1395            HydroNode::Network {
1396                serialize_fn,
1397                deserialize_fn,
1398                input,
1399                metadata,
1400                ..
1401            } => {
1402                let input_id = input.build_graph_structure(structure, seen_tees, config);
1403                let _from_location_key = setup_location(structure, metadata);
1404
1405                let root = metadata.location_id.root();
1406                let to_location_key = root.key();
1407                let to_location_type = root.location_type().unwrap();
1408                structure.add_location(to_location_key, to_location_type);
1409
1410                let mut label = "network(".to_owned();
1411                if serialize_fn.is_some() {
1412                    label.push_str("send");
1413                }
1414                if deserialize_fn.is_some() {
1415                    if serialize_fn.is_some() {
1416                        label.push_str(" + ");
1417                    }
1418                    label.push_str("recv");
1419                }
1420                label.push(')');
1421
1422                let network_id = structure.add_node_with_backtrace(
1423                    NodeLabel::Static(label),
1424                    HydroNodeType::Network,
1425                    Some(to_location_key),
1426                    Some(metadata.op.backtrace.clone()),
1427                );
1428
1429                // Extract semantic tags for network edge
1430                let input_metadata = input.metadata();
1431                add_edge_with_metadata(
1432                    structure,
1433                    input_id,
1434                    network_id,
1435                    Some(input_metadata),
1436                    Some(metadata),
1437                    Some(format!("to {:?}({})", to_location_type, to_location_key)),
1438                );
1439
1440                network_id
1441            }
1442
1443            // Non-deterministic batch operation
1444            HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1445                structure,
1446                seen_tees,
1447                config,
1448                input: inner,
1449                metadata,
1450                op_name: extract_op_name(self.print_root()),
1451                node_type: HydroNodeType::NonDeterministic,
1452            }),
1453
1454            HydroNode::YieldConcat { inner, .. } => {
1455                // Unpersist is typically optimized away, just pass through
1456                inner.build_graph_structure(structure, seen_tees, config)
1457            }
1458
1459            HydroNode::BeginAtomic { inner, .. } => {
1460                inner.build_graph_structure(structure, seen_tees, config)
1461            }
1462
1463            HydroNode::EndAtomic { inner, .. } => {
1464                inner.build_graph_structure(structure, seen_tees, config)
1465            }
1466
1467            HydroNode::Chain {
1468                first,
1469                second,
1470                metadata,
1471            } => {
1472                let first_id = first.build_graph_structure(structure, seen_tees, config);
1473                let second_id = second.build_graph_structure(structure, seen_tees, config);
1474                let location_key = Some(setup_location(structure, metadata));
1475                let chain_id = structure.add_node_with_backtrace(
1476                    NodeLabel::Static(extract_op_name(self.print_root())),
1477                    HydroNodeType::Transform,
1478                    location_key,
1479                    Some(metadata.op.backtrace.clone()),
1480                );
1481
1482                // Extract semantic tags for first edge
1483                let first_metadata = first.metadata();
1484                add_edge_with_metadata(
1485                    structure,
1486                    first_id,
1487                    chain_id,
1488                    Some(first_metadata),
1489                    Some(metadata),
1490                    Some("first".to_owned()),
1491                );
1492
1493                // Extract semantic tags for second edge
1494                let second_metadata = second.metadata();
1495                add_edge_with_metadata(
1496                    structure,
1497                    second_id,
1498                    chain_id,
1499                    Some(second_metadata),
1500                    Some(metadata),
1501                    Some("second".to_owned()),
1502                );
1503
1504                chain_id
1505            }
1506
1507            HydroNode::ChainFirst {
1508                first,
1509                second,
1510                metadata,
1511            } => {
1512                let first_id = first.build_graph_structure(structure, seen_tees, config);
1513                let second_id = second.build_graph_structure(structure, seen_tees, config);
1514                let location_key = Some(setup_location(structure, metadata));
1515                let chain_id = structure.add_node_with_backtrace(
1516                    NodeLabel::Static(extract_op_name(self.print_root())),
1517                    HydroNodeType::Transform,
1518                    location_key,
1519                    Some(metadata.op.backtrace.clone()),
1520                );
1521
1522                // Extract semantic tags for first edge
1523                let first_metadata = first.metadata();
1524                add_edge_with_metadata(
1525                    structure,
1526                    first_id,
1527                    chain_id,
1528                    Some(first_metadata),
1529                    Some(metadata),
1530                    Some("first".to_owned()),
1531                );
1532
1533                // Extract semantic tags for second edge
1534                let second_metadata = second.metadata();
1535                add_edge_with_metadata(
1536                    structure,
1537                    second_id,
1538                    chain_id,
1539                    Some(second_metadata),
1540                    Some(metadata),
1541                    Some("second".to_owned()),
1542                );
1543
1544                chain_id
1545            }
1546
1547            HydroNode::Counter {
1548                tag: _,
1549                prefix: _,
1550                duration,
1551                input,
1552                metadata,
1553            } => build_single_expr_transform(
1554                TransformParams {
1555                    structure,
1556                    seen_tees,
1557                    config,
1558                    input,
1559                    metadata,
1560                    op_name: extract_op_name(self.print_root()),
1561                    node_type: HydroNodeType::Transform,
1562                },
1563                duration,
1564            ),
1565        }
1566    }
1567}
1568
1569/// Utility functions for rendering multiple roots as a single graph.
1570/// Macro to reduce duplication in render functions.
1571macro_rules! render_hydro_ir {
1572    ($name:ident, $write_fn:ident) => {
1573        pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1574            let mut output = String::new();
1575            $write_fn(&mut output, roots, config).unwrap();
1576            output
1577        }
1578    };
1579}
1580
1581/// Macro to reduce duplication in write functions.
1582macro_rules! write_hydro_ir {
1583    ($name:ident, $writer_type:ty, $constructor:expr) => {
1584        pub fn $name(
1585            output: impl std::fmt::Write,
1586            roots: &[HydroRoot],
1587            config: HydroWriteConfig<'_>,
1588        ) -> std::fmt::Result {
1589            let mut graph_write: $writer_type = $constructor(output, config);
1590            write_hydro_ir_graph(&mut graph_write, roots, config)
1591        }
1592    };
1593}
1594
1595render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1596write_hydro_ir!(
1597    write_hydro_ir_mermaid,
1598    HydroMermaid<_>,
1599    HydroMermaid::new_with_config
1600);
1601
1602render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1603write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1604
1605// Legacy hydroscope function - now uses HydroJson for consistency
1606render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1607
1608// JSON rendering
1609render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1610write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1611
1612fn write_hydro_ir_graph<W>(
1613    graph_write: W,
1614    roots: &[HydroRoot],
1615    config: HydroWriteConfig<'_>,
1616) -> Result<(), W::Err>
1617where
1618    W: HydroGraphWrite,
1619{
1620    let mut structure = HydroGraphStructure::new();
1621    let mut seen_tees = HashMap::new();
1622
1623    // Build the graph structure for all roots
1624    for leaf in roots {
1625        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1626    }
1627
1628    write_graph_structure(&structure, graph_write, config)
1629}