hydro_lang/viz/
render.rs

1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::str::FromStr;
6
7use auto_impl::auto_impl;
8use slotmap::{KeyData, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12// Re-export specific implementations
13pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17
18/// Label for a graph node - can be either a static string or contain expressions.
19#[derive(Debug, Clone)]
20pub enum NodeLabel {
21    /// A static string label
22    Static(String),
23    /// A label with an operation name and expression arguments
24    WithExprs {
25        op_name: String,
26        exprs: Vec<DebugExpr>,
27    },
28}
29
30impl NodeLabel {
31    /// Create a static label
32    pub fn static_label(s: String) -> Self {
33        Self::Static(s)
34    }
35
36    /// Create a label for an operation with multiple expression
37    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
38        Self::WithExprs { op_name, exprs }
39    }
40}
41
42impl Display for NodeLabel {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            Self::Static(s) => write!(f, "{}", s),
46            Self::WithExprs { op_name, exprs } => {
47                if exprs.is_empty() {
48                    write!(f, "{}()", op_name)
49                } else {
50                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
51                    write!(f, "{}({})", op_name, expr_strs.join(", "))
52                }
53            }
54        }
55    }
56}
57
58/// Base struct for text-based graph writers that use indentation.
59/// Contains common fields shared by DOT and Mermaid writers.
60pub struct IndentedGraphWriter<W> {
61    pub write: W,
62    pub indent: usize,
63    pub config: HydroWriteConfig,
64}
65
66impl<W> IndentedGraphWriter<W> {
67    /// Create a new writer with default configuration.
68    pub fn new(write: W) -> Self {
69        Self {
70            write,
71            indent: 0,
72            config: HydroWriteConfig::default(),
73        }
74    }
75
76    /// Create a new writer with the given configuration.
77    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
78        Self {
79            write,
80            indent: 0,
81            config: config.clone(),
82        }
83    }
84}
85
86impl<W: Write> IndentedGraphWriter<W> {
87    /// Write an indented line using the current indentation level.
88    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
89        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
90    }
91}
92
93/// Common error type used by all graph writers.
94pub type GraphWriteError = std::fmt::Error;
95
96/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
97#[auto_impl(&mut, Box)]
98pub trait HydroGraphWrite {
99    /// Error type emitted by writing.
100    type Err: Error;
101
102    /// Begin the graph. First method called.
103    fn write_prologue(&mut self) -> Result<(), Self::Err>;
104
105    /// Write a node definition with styling.
106    fn write_node_definition(
107        &mut self,
108        node_id: VizNodeKey,
109        node_label: &NodeLabel,
110        node_type: HydroNodeType,
111        location_id: Option<usize>,
112        location_type: Option<&str>,
113        backtrace: Option<&Backtrace>,
114    ) -> Result<(), Self::Err>;
115
116    /// Write an edge between nodes with optional labeling.
117    fn write_edge(
118        &mut self,
119        src_id: VizNodeKey,
120        dst_id: VizNodeKey,
121        edge_properties: &HashSet<HydroEdgeProp>,
122        label: Option<&str>,
123    ) -> Result<(), Self::Err>;
124
125    /// Begin writing a location grouping (process/cluster).
126    fn write_location_start(
127        &mut self,
128        location_id: usize,
129        location_type: &str,
130    ) -> Result<(), Self::Err>;
131
132    /// Write a node within a location.
133    fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
134
135    /// End writing a location grouping.
136    fn write_location_end(&mut self) -> Result<(), Self::Err>;
137
138    /// End the graph. Last method called.
139    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
140}
141
142/// Node type utilities - centralized handling of HydroNodeType operations
143pub mod node_type_utils {
144    use super::HydroNodeType;
145
146    /// All node types with their string names
147    const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
148        (HydroNodeType::Source, "Source"),
149        (HydroNodeType::Transform, "Transform"),
150        (HydroNodeType::Join, "Join"),
151        (HydroNodeType::Aggregation, "Aggregation"),
152        (HydroNodeType::Network, "Network"),
153        (HydroNodeType::Sink, "Sink"),
154        (HydroNodeType::Tee, "Tee"),
155        (HydroNodeType::NonDeterministic, "NonDeterministic"),
156    ];
157
158    /// Convert HydroNodeType to string representation (used by JSON format)
159    pub fn to_string(node_type: HydroNodeType) -> &'static str {
160        NODE_TYPE_DATA
161            .iter()
162            .find(|(nt, _)| *nt == node_type)
163            .map(|(_, name)| *name)
164            .unwrap_or("Unknown")
165    }
166
167    /// Get all node types with their string representations (used by JSON format)
168    pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
169        NODE_TYPE_DATA.to_vec()
170    }
171}
172
173/// Types of nodes in Hydro IR for styling purposes.
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum HydroNodeType {
176    Source,
177    Transform,
178    Join,
179    Aggregation,
180    Network,
181    Sink,
182    Tee,
183    NonDeterministic,
184}
185
186/// Types of edges in Hydro IR representing stream properties.
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
188pub enum HydroEdgeProp {
189    Bounded,
190    Unbounded,
191    TotalOrder,
192    NoOrder,
193    Keyed,
194    // Collection type tags for styling
195    Stream,
196    KeyedSingleton,
197    KeyedStream,
198    Singleton,
199    Optional,
200    Network,
201    Cycle,
202}
203
204/// Unified edge style representation for all graph formats.
205/// This intermediate format allows consistent styling across JSON, DOT, and Mermaid.
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct UnifiedEdgeStyle {
208    /// Line pattern (solid, dashed)
209    pub line_pattern: LinePattern,
210    /// Line width (1 = thin, 3 = thick)
211    pub line_width: u8,
212    /// Arrowhead style
213    pub arrowhead: ArrowheadStyle,
214    /// Line style (single plain line, or line with hash marks/dots for keyed streams)
215    pub line_style: LineStyle,
216    /// Halo/background effect for boundedness
217    pub halo: HaloStyle,
218    /// Line waviness for ordering information
219    pub waviness: WavinessStyle,
220    /// Whether animation is enabled (JSON only)
221    pub animation: AnimationStyle,
222    /// Color for the edge
223    pub color: &'static str,
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum LinePattern {
228    Solid,
229    Dotted,
230    Dashed,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ArrowheadStyle {
235    TriangleFilled,
236    CircleFilled,
237    DiamondOpen,
238    Default,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub enum LineStyle {
243    /// Plain single line
244    Single,
245    /// Single line with hash marks/dots (for keyed streams)
246    HashMarks,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum HaloStyle {
251    None,
252    LightBlue,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum WavinessStyle {
257    None,
258    Wavy,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum AnimationStyle {
263    Static,
264    Animated,
265}
266
267impl Default for UnifiedEdgeStyle {
268    fn default() -> Self {
269        Self {
270            line_pattern: LinePattern::Solid,
271            line_width: 1,
272            arrowhead: ArrowheadStyle::Default,
273            line_style: LineStyle::Single,
274            halo: HaloStyle::None,
275            waviness: WavinessStyle::None,
276            animation: AnimationStyle::Static,
277            color: "#666666",
278        }
279    }
280}
281
282/// Convert HydroEdgeType properties to unified edge style.
283/// This is the core logic for determining edge visual properties.
284///
285/// # Visual Encoding Mapping
286///
287/// | Semantic Property | Visual Channel | Values |
288/// |------------------|----------------|---------|
289/// | Network | Line Pattern + Animation | Local (solid, static), Network (dashed, animated) |
290/// | Ordering | Waviness | TotalOrder (straight), NoOrder (wavy) |
291/// | Boundedness | Halo | Bounded (none), Unbounded (light-blue transparent) |
292/// | Keyedness | Line Style | NotKeyed (plain line), Keyed (line with hash marks/dots) |
293/// | Collection Type | Color + Arrowhead | Stream (blue #2563eb, triangle), Singleton (black, circle), Optional (gray, diamond) |
294pub fn get_unified_edge_style(
295    edge_properties: &HashSet<HydroEdgeProp>,
296    src_location: Option<usize>,
297    dst_location: Option<usize>,
298) -> UnifiedEdgeStyle {
299    let mut style = UnifiedEdgeStyle::default();
300
301    // Network communication group - controls line pattern AND animation
302    let is_network = edge_properties.contains(&HydroEdgeProp::Network)
303        || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
304
305    if is_network {
306        style.line_pattern = LinePattern::Dashed;
307        style.animation = AnimationStyle::Animated;
308    } else {
309        style.line_pattern = LinePattern::Solid;
310        style.animation = AnimationStyle::Static;
311    }
312
313    // Boundedness group - controls halo
314    if edge_properties.contains(&HydroEdgeProp::Unbounded) {
315        style.halo = HaloStyle::LightBlue;
316    } else {
317        style.halo = HaloStyle::None;
318    }
319
320    // Collection type group - controls arrowhead and color
321    if edge_properties.contains(&HydroEdgeProp::Stream) {
322        style.arrowhead = ArrowheadStyle::TriangleFilled;
323        style.color = "#2563eb"; // Bright blue for Stream
324    } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
325        style.arrowhead = ArrowheadStyle::TriangleFilled;
326        style.color = "#2563eb"; // Bright blue for Stream (keyed variant)
327    } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
328        style.arrowhead = ArrowheadStyle::TriangleFilled;
329        style.color = "#000000"; // Black for Singleton (keyed variant)
330    } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
331        style.arrowhead = ArrowheadStyle::CircleFilled;
332        style.color = "#000000"; // Black for Singleton
333    } else if edge_properties.contains(&HydroEdgeProp::Optional) {
334        style.arrowhead = ArrowheadStyle::DiamondOpen;
335        style.color = "#6b7280"; // Gray for Optional
336    }
337
338    // Keyedness group - controls hash marks on the line
339    if edge_properties.contains(&HydroEdgeProp::Keyed) {
340        style.line_style = LineStyle::HashMarks; // Renders as hash marks/dots on the line in hydroscope
341    } else {
342        style.line_style = LineStyle::Single;
343    }
344
345    // Ordering group - waviness channel
346    if edge_properties.contains(&HydroEdgeProp::NoOrder) {
347        style.waviness = WavinessStyle::Wavy;
348    } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
349        style.waviness = WavinessStyle::None;
350    }
351
352    style
353}
354
355/// Extract semantic edge properties from CollectionKind metadata.
356/// This function analyzes the collection type and extracts relevant semantic tags
357/// for visualization purposes.
358pub fn extract_edge_properties_from_collection_kind(
359    collection_kind: &crate::compile::ir::CollectionKind,
360) -> HashSet<HydroEdgeProp> {
361    use crate::compile::ir::CollectionKind;
362
363    let mut properties = HashSet::new();
364
365    match collection_kind {
366        CollectionKind::Stream { bound, order, .. } => {
367            properties.insert(HydroEdgeProp::Stream);
368            add_bound_property(&mut properties, bound);
369            add_order_property(&mut properties, order);
370        }
371        CollectionKind::KeyedStream {
372            bound, value_order, ..
373        } => {
374            properties.insert(HydroEdgeProp::KeyedStream);
375            properties.insert(HydroEdgeProp::Keyed);
376            add_bound_property(&mut properties, bound);
377            add_order_property(&mut properties, value_order);
378        }
379        CollectionKind::Singleton { bound, .. } => {
380            properties.insert(HydroEdgeProp::Singleton);
381            add_bound_property(&mut properties, bound);
382            // Singletons have implicit TotalOrder
383            properties.insert(HydroEdgeProp::TotalOrder);
384        }
385        CollectionKind::Optional { bound, .. } => {
386            properties.insert(HydroEdgeProp::Optional);
387            add_bound_property(&mut properties, bound);
388            // Optionals have implicit TotalOrder
389            properties.insert(HydroEdgeProp::TotalOrder);
390        }
391        CollectionKind::KeyedSingleton { bound, .. } => {
392            properties.insert(HydroEdgeProp::Singleton);
393            properties.insert(HydroEdgeProp::Keyed);
394            // KeyedSingletons boundedness depends on the bound kind
395            add_keyed_singleton_bound_property(&mut properties, bound);
396            properties.insert(HydroEdgeProp::TotalOrder);
397        }
398    }
399
400    properties
401}
402
403/// Helper function to add bound property based on BoundKind.
404fn add_bound_property(
405    properties: &mut HashSet<HydroEdgeProp>,
406    bound: &crate::compile::ir::BoundKind,
407) {
408    use crate::compile::ir::BoundKind;
409
410    match bound {
411        BoundKind::Bounded => {
412            properties.insert(HydroEdgeProp::Bounded);
413        }
414        BoundKind::Unbounded => {
415            properties.insert(HydroEdgeProp::Unbounded);
416        }
417    }
418}
419
420/// Helper function to add bound property for KeyedSingleton based on KeyedSingletonBoundKind.
421fn add_keyed_singleton_bound_property(
422    properties: &mut HashSet<HydroEdgeProp>,
423    bound: &crate::compile::ir::KeyedSingletonBoundKind,
424) {
425    use crate::compile::ir::KeyedSingletonBoundKind;
426
427    match bound {
428        KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
429            properties.insert(HydroEdgeProp::Bounded);
430        }
431        KeyedSingletonBoundKind::Unbounded => {
432            properties.insert(HydroEdgeProp::Unbounded);
433        }
434    }
435}
436
437/// Helper function to add order property based on StreamOrder.
438fn add_order_property(
439    properties: &mut HashSet<HydroEdgeProp>,
440    order: &crate::compile::ir::StreamOrder,
441) {
442    use crate::compile::ir::StreamOrder;
443
444    match order {
445        StreamOrder::TotalOrder => {
446            properties.insert(HydroEdgeProp::TotalOrder);
447        }
448        StreamOrder::NoOrder => {
449            properties.insert(HydroEdgeProp::NoOrder);
450        }
451    }
452}
453
454/// Detect if an edge crosses network boundaries by comparing source and destination locations.
455/// Returns true if the edge represents network communication between different locations.
456pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
457    // Compare the root locations to determine if they differ
458    src_location.root() != dst_location.root()
459}
460
461/// Add network edge tag if source and destination locations differ.
462pub fn add_network_edge_tag(
463    properties: &mut HashSet<HydroEdgeProp>,
464    src_location: &LocationId,
465    dst_location: &LocationId,
466) {
467    if is_network_edge(src_location, dst_location) {
468        properties.insert(HydroEdgeProp::Network);
469    }
470}
471
472/// Configuration for graph writing.
473#[derive(Debug, Clone)]
474pub struct HydroWriteConfig {
475    pub show_metadata: bool,
476    pub show_location_groups: bool,
477    pub use_short_labels: bool,
478    pub process_id_name: Vec<(usize, String)>,
479    pub cluster_id_name: Vec<(usize, String)>,
480    pub external_id_name: Vec<(usize, String)>,
481}
482
483impl Default for HydroWriteConfig {
484    fn default() -> Self {
485        Self {
486            show_metadata: false,
487            show_location_groups: true,
488            use_short_labels: true, // Default to short labels for all renderers
489            process_id_name: vec![],
490            cluster_id_name: vec![],
491            external_id_name: vec![],
492        }
493    }
494}
495
496/// Node information in the Hydro graph.
497#[derive(Clone)]
498pub struct HydroGraphNode {
499    pub label: NodeLabel,
500    pub node_type: HydroNodeType,
501    pub location: Option<usize>,
502    pub backtrace: Option<Backtrace>,
503}
504
505slotmap::new_key_type! {
506    /// Unique identifier for nodes in the visualization graph.
507    ///
508    /// This is counted/allocated separately from any other IDs within `hydro_lang`.
509    pub struct VizNodeKey;
510}
511
512impl Display for VizNodeKey {
513    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514        write!(f, "{:?}", self) // `"VizNodeKey(1v1)"`
515    }
516}
517
518impl FromStr for VizNodeKey {
519    type Err = Option<ParseIntError>;
520
521    fn from_str(s: &str) -> Result<Self, Self::Err> {
522        // Parse `"VizNodeKey(1v1)"`
523        let s = s.strip_prefix("VizNodeKey(").ok_or(None)?;
524        let s = s.strip_suffix(")").ok_or(None)?;
525        let (idx, version) = s.split_once("v").ok_or(None)?;
526        let idx: u64 = idx.parse()?;
527        let version: u64 = version.parse()?;
528        let key_data = KeyData::from_ffi((version << 32) | idx);
529        Ok(Self::from(key_data))
530    }
531}
532
533/// Edge information in the Hydro graph.
534#[derive(Debug, Clone)]
535pub struct HydroGraphEdge {
536    pub src: VizNodeKey,
537    pub dst: VizNodeKey,
538    pub edge_properties: HashSet<HydroEdgeProp>,
539    pub label: Option<String>,
540}
541
542/// Graph structure tracker for Hydro IR rendering.
543#[derive(Default)]
544pub struct HydroGraphStructure {
545    pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
546    pub edges: Vec<HydroGraphEdge>,
547    pub locations: HashMap<usize, String>, // location_id -> location_type
548}
549
550impl HydroGraphStructure {
551    pub fn new() -> Self {
552        Self::default()
553    }
554
555    pub fn add_node(
556        &mut self,
557        label: NodeLabel,
558        node_type: HydroNodeType,
559        location: Option<usize>,
560    ) -> VizNodeKey {
561        self.add_node_with_backtrace(label, node_type, location, None)
562    }
563
564    pub fn add_node_with_backtrace(
565        &mut self,
566        label: NodeLabel,
567        node_type: HydroNodeType,
568        location: Option<usize>,
569        backtrace: Option<Backtrace>,
570    ) -> VizNodeKey {
571        self.nodes.insert(HydroGraphNode {
572            label,
573            node_type,
574            location,
575            backtrace,
576        })
577    }
578
579    /// Add a node with metadata, extracting backtrace automatically
580    pub fn add_node_with_metadata(
581        &mut self,
582        label: NodeLabel,
583        node_type: HydroNodeType,
584        metadata: &HydroIrMetadata,
585    ) -> VizNodeKey {
586        let location = setup_location(self, metadata);
587        let backtrace = Some(metadata.op.backtrace.clone());
588        self.add_node_with_backtrace(label, node_type, location, backtrace)
589    }
590
591    pub fn add_edge(
592        &mut self,
593        src: VizNodeKey,
594        dst: VizNodeKey,
595        edge_properties: HashSet<HydroEdgeProp>,
596        label: Option<String>,
597    ) {
598        self.edges.push(HydroGraphEdge {
599            src,
600            dst,
601            edge_properties,
602            label,
603        });
604    }
605
606    // Legacy method for backward compatibility
607    pub fn add_edge_single(
608        &mut self,
609        src: VizNodeKey,
610        dst: VizNodeKey,
611        edge_type: HydroEdgeProp,
612        label: Option<String>,
613    ) {
614        let mut properties = HashSet::new();
615        properties.insert(edge_type);
616        self.edges.push(HydroGraphEdge {
617            src,
618            dst,
619            edge_properties: properties,
620            label,
621        });
622    }
623
624    pub fn add_location(&mut self, location_id: usize, location_type: String) {
625        self.locations.insert(location_id, location_type);
626    }
627}
628
629/// Function to extract an op_name from a print_root() result for use in labels.
630pub fn extract_op_name(full_label: String) -> String {
631    full_label
632        .split('(')
633        .next()
634        .unwrap_or("unknown")
635        .to_string()
636        .to_lowercase()
637}
638
639/// Extract a short, readable label from the full token stream label using print_root() style naming
640pub fn extract_short_label(full_label: &str) -> String {
641    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
642    if let Some(op_name) = full_label.split('(').next() {
643        let base_name = op_name.to_lowercase();
644        match base_name.as_str() {
645            // Handle special cases for UI display
646            "source" => {
647                if full_label.contains("Iter") {
648                    "source_iter".to_string()
649                } else if full_label.contains("Stream") {
650                    "source_stream".to_string()
651                } else if full_label.contains("ExternalNetwork") {
652                    "external_network".to_string()
653                } else if full_label.contains("Spin") {
654                    "spin".to_string()
655                } else {
656                    "source".to_string()
657                }
658            }
659            "network" => {
660                if full_label.contains("deser") {
661                    "network(recv)".to_string()
662                } else if full_label.contains("ser") {
663                    "network(send)".to_string()
664                } else {
665                    "network".to_string()
666                }
667            }
668            // For all other cases, just use the lowercase base name (same as extract_op_name)
669            _ => base_name,
670        }
671    } else {
672        // Fallback for labels that don't follow the pattern
673        if full_label.len() > 20 {
674            format!("{}...", &full_label[..17])
675        } else {
676            full_label.to_string()
677        }
678    }
679}
680
681/// Helper function to extract location ID and type from metadata.
682fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
683    match location_id.root() {
684        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
685        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
686        _ => panic!("unexpected location type"),
687    }
688}
689
690/// Helper function to set up location in structure from metadata.
691fn setup_location(
692    structure: &mut HydroGraphStructure,
693    metadata: &HydroIrMetadata,
694) -> Option<usize> {
695    let (location_id, location_type) = extract_location_id(&metadata.location_kind);
696    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
697        structure.add_location(loc_id, loc_type);
698    }
699    location_id
700}
701
702/// Helper function to add an edge with semantic tags extracted from metadata.
703/// This function combines collection kind extraction with network detection.
704fn add_edge_with_metadata(
705    structure: &mut HydroGraphStructure,
706    src_id: VizNodeKey,
707    dst_id: VizNodeKey,
708    src_metadata: Option<&HydroIrMetadata>,
709    dst_metadata: Option<&HydroIrMetadata>,
710    label: Option<String>,
711) {
712    let mut properties = HashSet::new();
713
714    // Extract semantic tags from source metadata's collection kind
715    if let Some(metadata) = src_metadata {
716        properties.extend(extract_edge_properties_from_collection_kind(
717            &metadata.collection_kind,
718        ));
719    }
720
721    // Add network edge tag if locations differ
722    if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
723        add_network_edge_tag(
724            &mut properties,
725            &src_meta.location_kind,
726            &dst_meta.location_kind,
727        );
728    }
729
730    // If no properties were extracted, default to Stream
731    if properties.is_empty() {
732        properties.insert(HydroEdgeProp::Stream);
733    }
734
735    structure.add_edge(src_id, dst_id, properties, label);
736}
737
738/// Helper function to write a graph structure using any GraphWrite implementation
739fn write_graph_structure<W>(
740    structure: &HydroGraphStructure,
741    graph_write: W,
742    config: &HydroWriteConfig,
743) -> Result<(), W::Err>
744where
745    W: HydroGraphWrite,
746{
747    let mut graph_write = graph_write;
748    // Write the graph
749    graph_write.write_prologue()?;
750
751    // Write node definitions
752    for (node_id, node) in structure.nodes.iter() {
753        let (location_id, location_type) = if let Some(loc_id) = node.location {
754            (
755                Some(loc_id),
756                structure.locations.get(&loc_id).map(|s| s.as_str()),
757            )
758        } else {
759            (None, None)
760        };
761
762        graph_write.write_node_definition(
763            node_id,
764            &node.label,
765            node.node_type,
766            location_id,
767            location_type,
768            node.backtrace.as_ref(),
769        )?;
770    }
771
772    // Group nodes by location if requested
773    if config.show_location_groups {
774        let mut nodes_by_location: HashMap<usize, Vec<VizNodeKey>> = HashMap::new();
775        for (node_id, node) in structure.nodes.iter() {
776            if let Some(location_id) = node.location {
777                nodes_by_location
778                    .entry(location_id)
779                    .or_default()
780                    .push(node_id);
781            }
782        }
783
784        for (&location_id, node_ids) in &nodes_by_location {
785            if let Some(location_type) = structure.locations.get(&location_id) {
786                graph_write.write_location_start(location_id, location_type)?;
787                for &node_id in node_ids {
788                    graph_write.write_node(node_id)?;
789                }
790                graph_write.write_location_end()?;
791            }
792        }
793    }
794
795    // Write edges
796    for edge in &structure.edges {
797        graph_write.write_edge(
798            edge.src,
799            edge.dst,
800            &edge.edge_properties,
801            edge.label.as_deref(),
802        )?;
803    }
804
805    graph_write.write_epilogue()?;
806    Ok(())
807}
808
809impl HydroRoot {
810    /// Build the graph structure by traversing the IR tree.
811    pub fn build_graph_structure(
812        &self,
813        structure: &mut HydroGraphStructure,
814        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
815        config: &HydroWriteConfig,
816    ) -> VizNodeKey {
817        // Helper function for sink nodes to reduce duplication
818        fn build_sink_node(
819            structure: &mut HydroGraphStructure,
820            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
821            config: &HydroWriteConfig,
822            input: &HydroNode,
823            sink_metadata: Option<&HydroIrMetadata>,
824            label: NodeLabel,
825        ) -> VizNodeKey {
826            let input_id = input.build_graph_structure(structure, seen_tees, config);
827
828            // If no explicit metadata is provided, extract it from the input node
829            let effective_metadata = if let Some(meta) = sink_metadata {
830                Some(meta)
831            } else {
832                match input {
833                    HydroNode::Placeholder => None,
834                    // All other variants have metadata
835                    _ => Some(input.metadata()),
836                }
837            };
838
839            let location_id = effective_metadata.and_then(|m| setup_location(structure, m));
840            let sink_id = structure.add_node_with_backtrace(
841                label,
842                HydroNodeType::Sink,
843                location_id,
844                effective_metadata.map(|m| m.op.backtrace.clone()),
845            );
846
847            // Extract semantic tags from input metadata
848            let input_metadata = input.metadata();
849            add_edge_with_metadata(
850                structure,
851                input_id,
852                sink_id,
853                Some(input_metadata),
854                sink_metadata,
855                None,
856            );
857
858            sink_id
859        }
860
861        match self {
862            // Sink operations - semantic tags extracted from input metadata
863            HydroRoot::ForEach { f, input, .. } => build_sink_node(
864                structure,
865                seen_tees,
866                config,
867                input,
868                None,
869                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
870            ),
871
872            HydroRoot::SendExternal {
873                to_external_id,
874                to_port_id,
875                input,
876                ..
877            } => build_sink_node(
878                structure,
879                seen_tees,
880                config,
881                input,
882                None,
883                NodeLabel::with_exprs(
884                    format!("send_external({}:{})", to_external_id, to_port_id),
885                    vec![],
886                ),
887            ),
888
889            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
890                structure,
891                seen_tees,
892                config,
893                input,
894                None,
895                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
896            ),
897
898            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
899                structure,
900                seen_tees,
901                config,
902                input,
903                None,
904                NodeLabel::static_label(format!("cycle_sink({})", ident)),
905            ),
906        }
907    }
908}
909
910impl HydroNode {
911    /// Build the graph structure recursively for this node.
912    pub fn build_graph_structure(
913        &self,
914        structure: &mut HydroGraphStructure,
915        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
916        config: &HydroWriteConfig,
917    ) -> VizNodeKey {
918        use crate::location::dynamic::LocationId;
919
920        // Helper functions to reduce duplication, categorized by input/expression patterns
921
922        /// Common parameters for transform builder functions to reduce argument count
923        struct TransformParams<'a> {
924            structure: &'a mut HydroGraphStructure,
925            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
926            config: &'a HydroWriteConfig,
927            input: &'a HydroNode,
928            metadata: &'a HydroIrMetadata,
929            op_name: String,
930            node_type: HydroNodeType,
931        }
932
933        // Single-input transform with no expressions
934        fn build_simple_transform(params: TransformParams) -> VizNodeKey {
935            let input_id = params.input.build_graph_structure(
936                params.structure,
937                params.seen_tees,
938                params.config,
939            );
940            let node_id = params.structure.add_node_with_metadata(
941                NodeLabel::Static(params.op_name.to_string()),
942                params.node_type,
943                params.metadata,
944            );
945
946            // Extract semantic tags from input metadata
947            let input_metadata = params.input.metadata();
948            add_edge_with_metadata(
949                params.structure,
950                input_id,
951                node_id,
952                Some(input_metadata),
953                Some(params.metadata),
954                None,
955            );
956
957            node_id
958        }
959
960        // Single-input transform with one expression
961        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
962            let input_id = params.input.build_graph_structure(
963                params.structure,
964                params.seen_tees,
965                params.config,
966            );
967            let node_id = params.structure.add_node_with_metadata(
968                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
969                params.node_type,
970                params.metadata,
971            );
972
973            // Extract semantic tags from input metadata
974            let input_metadata = params.input.metadata();
975            add_edge_with_metadata(
976                params.structure,
977                input_id,
978                node_id,
979                Some(input_metadata),
980                Some(params.metadata),
981                None,
982            );
983
984            node_id
985        }
986
987        // Single-input transform with two expressions
988        fn build_dual_expr_transform(
989            params: TransformParams,
990            expr1: &DebugExpr,
991            expr2: &DebugExpr,
992        ) -> VizNodeKey {
993            let input_id = params.input.build_graph_structure(
994                params.structure,
995                params.seen_tees,
996                params.config,
997            );
998            let node_id = params.structure.add_node_with_metadata(
999                NodeLabel::with_exprs(
1000                    params.op_name.to_string(),
1001                    vec![expr1.clone(), expr2.clone()],
1002                ),
1003                params.node_type,
1004                params.metadata,
1005            );
1006
1007            // Extract semantic tags from input metadata
1008            let input_metadata = params.input.metadata();
1009            add_edge_with_metadata(
1010                params.structure,
1011                input_id,
1012                node_id,
1013                Some(input_metadata),
1014                Some(params.metadata),
1015                None,
1016            );
1017
1018            node_id
1019        }
1020
1021        // Helper function for source nodes
1022        fn build_source_node(
1023            structure: &mut HydroGraphStructure,
1024            metadata: &HydroIrMetadata,
1025            label: String,
1026        ) -> VizNodeKey {
1027            structure.add_node_with_metadata(
1028                NodeLabel::Static(label),
1029                HydroNodeType::Source,
1030                metadata,
1031            )
1032        }
1033
1034        match self {
1035            HydroNode::Placeholder => structure.add_node(
1036                NodeLabel::Static("PLACEHOLDER".to_string()),
1037                HydroNodeType::Transform,
1038                None,
1039            ),
1040
1041            HydroNode::Source {
1042                source, metadata, ..
1043            } => {
1044                let label = match source {
1045                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
1046                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
1047                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
1048                    HydroSource::Spin() => "spin()".to_string(),
1049                    HydroSource::ClusterMembers(location_id) => {
1050                        format!(
1051                            "source_stream(cluster_membership_stream({:?}))",
1052                            location_id
1053                        )
1054                    }
1055                };
1056                build_source_node(structure, metadata, label)
1057            }
1058
1059            HydroNode::SingletonSource { value, metadata } => {
1060                let label = format!("singleton({})", value);
1061                build_source_node(structure, metadata, label)
1062            }
1063
1064            HydroNode::ExternalInput {
1065                from_external_id,
1066                from_port_id,
1067                metadata,
1068                ..
1069            } => build_source_node(
1070                structure,
1071                metadata,
1072                format!("external_input({}:{})", from_external_id, from_port_id),
1073            ),
1074
1075            HydroNode::CycleSource {
1076                ident, metadata, ..
1077            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1078
1079            HydroNode::Tee { inner, metadata } => {
1080                let ptr = inner.as_ptr();
1081                if let Some(&existing_id) = seen_tees.get(&ptr) {
1082                    return existing_id;
1083                }
1084
1085                let input_id = inner
1086                    .0
1087                    .borrow()
1088                    .build_graph_structure(structure, seen_tees, config);
1089                let tee_id = structure.add_node_with_metadata(
1090                    NodeLabel::Static(extract_op_name(self.print_root())),
1091                    HydroNodeType::Tee,
1092                    metadata,
1093                );
1094
1095                seen_tees.insert(ptr, tee_id);
1096
1097                // Extract semantic tags from input
1098                let inner_borrow = inner.0.borrow();
1099                let input_metadata = inner_borrow.metadata();
1100                add_edge_with_metadata(
1101                    structure,
1102                    input_id,
1103                    tee_id,
1104                    Some(input_metadata),
1105                    Some(metadata),
1106                    None,
1107                );
1108                drop(inner_borrow);
1109
1110                tee_id
1111            }
1112
1113            // Non-deterministic operation
1114            HydroNode::ObserveNonDet {
1115                inner, metadata, ..
1116            } => build_simple_transform(TransformParams {
1117                structure,
1118                seen_tees,
1119                config,
1120                input: inner,
1121                metadata,
1122                op_name: extract_op_name(self.print_root()),
1123                node_type: HydroNodeType::NonDeterministic,
1124            }),
1125
1126            // Transform operations with Stream edges - grouped by node/edge type
1127            HydroNode::Cast { inner, metadata }
1128            | HydroNode::DeferTick {
1129                input: inner,
1130                metadata,
1131            }
1132            | HydroNode::Enumerate {
1133                input: inner,
1134                metadata,
1135                ..
1136            }
1137            | HydroNode::Unique {
1138                input: inner,
1139                metadata,
1140            }
1141            | HydroNode::ResolveFutures {
1142                input: inner,
1143                metadata,
1144            }
1145            | HydroNode::ResolveFuturesOrdered {
1146                input: inner,
1147                metadata,
1148            } => build_simple_transform(TransformParams {
1149                structure,
1150                seen_tees,
1151                config,
1152                input: inner,
1153                metadata,
1154                op_name: extract_op_name(self.print_root()),
1155                node_type: HydroNodeType::Transform,
1156            }),
1157
1158            // Aggregation operation - semantic tags extracted from metadata
1159            HydroNode::Sort {
1160                input: inner,
1161                metadata,
1162            } => build_simple_transform(TransformParams {
1163                structure,
1164                seen_tees,
1165                config,
1166                input: inner,
1167                metadata,
1168                op_name: extract_op_name(self.print_root()),
1169                node_type: HydroNodeType::Aggregation,
1170            }),
1171
1172            // Single-expression Transform operations - grouped by node type
1173            HydroNode::Map { f, input, metadata }
1174            | HydroNode::Filter { f, input, metadata }
1175            | HydroNode::FlatMap { f, input, metadata }
1176            | HydroNode::FilterMap { f, input, metadata }
1177            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1178                TransformParams {
1179                    structure,
1180                    seen_tees,
1181                    config,
1182                    input,
1183                    metadata,
1184                    op_name: extract_op_name(self.print_root()),
1185                    node_type: HydroNodeType::Transform,
1186                },
1187                f,
1188            ),
1189
1190            // Single-expression Aggregation operations - grouped by node type
1191            HydroNode::Reduce { f, input, metadata }
1192            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1193                TransformParams {
1194                    structure,
1195                    seen_tees,
1196                    config,
1197                    input,
1198                    metadata,
1199                    op_name: extract_op_name(self.print_root()),
1200                    node_type: HydroNodeType::Aggregation,
1201                },
1202                f,
1203            ),
1204
1205            // Join-like operations with left/right edge labels - grouped by edge labeling
1206            HydroNode::Join {
1207                left,
1208                right,
1209                metadata,
1210            }
1211            | HydroNode::CrossProduct {
1212                left,
1213                right,
1214                metadata,
1215            }
1216            | HydroNode::CrossSingleton {
1217                left,
1218                right,
1219                metadata,
1220            } => {
1221                let left_id = left.build_graph_structure(structure, seen_tees, config);
1222                let right_id = right.build_graph_structure(structure, seen_tees, config);
1223                let node_id = structure.add_node_with_metadata(
1224                    NodeLabel::Static(extract_op_name(self.print_root())),
1225                    HydroNodeType::Join,
1226                    metadata,
1227                );
1228
1229                // Extract semantic tags for left edge
1230                let left_metadata = left.metadata();
1231                add_edge_with_metadata(
1232                    structure,
1233                    left_id,
1234                    node_id,
1235                    Some(left_metadata),
1236                    Some(metadata),
1237                    Some("left".to_string()),
1238                );
1239
1240                // Extract semantic tags for right edge
1241                let right_metadata = right.metadata();
1242                add_edge_with_metadata(
1243                    structure,
1244                    right_id,
1245                    node_id,
1246                    Some(right_metadata),
1247                    Some(metadata),
1248                    Some("right".to_string()),
1249                );
1250
1251                node_id
1252            }
1253
1254            // Join-like operations with pos/neg edge labels - grouped by edge labeling
1255            HydroNode::Difference {
1256                pos: left,
1257                neg: right,
1258                metadata,
1259            }
1260            | HydroNode::AntiJoin {
1261                pos: left,
1262                neg: right,
1263                metadata,
1264            } => {
1265                let left_id = left.build_graph_structure(structure, seen_tees, config);
1266                let right_id = right.build_graph_structure(structure, seen_tees, config);
1267                let node_id = structure.add_node_with_metadata(
1268                    NodeLabel::Static(extract_op_name(self.print_root())),
1269                    HydroNodeType::Join,
1270                    metadata,
1271                );
1272
1273                // Extract semantic tags for pos edge
1274                let left_metadata = left.metadata();
1275                add_edge_with_metadata(
1276                    structure,
1277                    left_id,
1278                    node_id,
1279                    Some(left_metadata),
1280                    Some(metadata),
1281                    Some("pos".to_string()),
1282                );
1283
1284                // Extract semantic tags for neg edge
1285                let right_metadata = right.metadata();
1286                add_edge_with_metadata(
1287                    structure,
1288                    right_id,
1289                    node_id,
1290                    Some(right_metadata),
1291                    Some(metadata),
1292                    Some("neg".to_string()),
1293                );
1294
1295                node_id
1296            }
1297
1298            // Dual expression transforms - consolidated using pattern matching
1299            HydroNode::Fold {
1300                init,
1301                acc,
1302                input,
1303                metadata,
1304            }
1305            | HydroNode::FoldKeyed {
1306                init,
1307                acc,
1308                input,
1309                metadata,
1310            }
1311            | HydroNode::Scan {
1312                init,
1313                acc,
1314                input,
1315                metadata,
1316            } => {
1317                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
1318
1319                build_dual_expr_transform(
1320                    TransformParams {
1321                        structure,
1322                        seen_tees,
1323                        config,
1324                        input,
1325                        metadata,
1326                        op_name: extract_op_name(self.print_root()),
1327                        node_type,
1328                    },
1329                    init,
1330                    acc,
1331                )
1332            }
1333
1334            // Combination of join and transform
1335            HydroNode::ReduceKeyedWatermark {
1336                f,
1337                input,
1338                watermark,
1339                metadata,
1340            } => {
1341                let input_id = input.build_graph_structure(structure, seen_tees, config);
1342                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1343                let location_id = setup_location(structure, metadata);
1344                let join_node_id = structure.add_node_with_backtrace(
1345                    NodeLabel::Static(extract_op_name(self.print_root())),
1346                    HydroNodeType::Join,
1347                    location_id,
1348                    Some(metadata.op.backtrace.clone()),
1349                );
1350
1351                // Extract semantic tags for input edge
1352                let input_metadata = input.metadata();
1353                add_edge_with_metadata(
1354                    structure,
1355                    input_id,
1356                    join_node_id,
1357                    Some(input_metadata),
1358                    Some(metadata),
1359                    Some("input".to_string()),
1360                );
1361
1362                // Extract semantic tags for watermark edge
1363                let watermark_metadata = watermark.metadata();
1364                add_edge_with_metadata(
1365                    structure,
1366                    watermark_id,
1367                    join_node_id,
1368                    Some(watermark_metadata),
1369                    Some(metadata),
1370                    Some("watermark".to_string()),
1371                );
1372
1373                let node_id = structure.add_node_with_backtrace(
1374                    NodeLabel::with_exprs(
1375                        extract_op_name(self.print_root()).to_string(),
1376                        vec![f.clone()],
1377                    ),
1378                    HydroNodeType::Aggregation,
1379                    location_id,
1380                    Some(metadata.op.backtrace.clone()),
1381                );
1382
1383                // Edge from join to aggregation node
1384                let join_metadata = metadata; // Use the same metadata
1385                add_edge_with_metadata(
1386                    structure,
1387                    join_node_id,
1388                    node_id,
1389                    Some(join_metadata),
1390                    Some(metadata),
1391                    None,
1392                );
1393
1394                node_id
1395            }
1396
1397            HydroNode::Network {
1398                serialize_fn,
1399                deserialize_fn,
1400                input,
1401                metadata,
1402                ..
1403            } => {
1404                let input_id = input.build_graph_structure(structure, seen_tees, config);
1405                let _from_location_id = setup_location(structure, metadata);
1406
1407                let to_location_id = match metadata.location_kind.root() {
1408                    LocationId::Process(id) => {
1409                        structure.add_location(*id, "Process".to_string());
1410                        Some(*id)
1411                    }
1412                    LocationId::Cluster(id) => {
1413                        structure.add_location(*id, "Cluster".to_string());
1414                        Some(*id)
1415                    }
1416                    _ => None,
1417                };
1418
1419                let mut label = "network(".to_string();
1420                if serialize_fn.is_some() {
1421                    label.push_str("send");
1422                }
1423                if deserialize_fn.is_some() {
1424                    if serialize_fn.is_some() {
1425                        label.push_str(" + ");
1426                    }
1427                    label.push_str("recv");
1428                }
1429                label.push(')');
1430
1431                let network_id = structure.add_node_with_backtrace(
1432                    NodeLabel::Static(label),
1433                    HydroNodeType::Network,
1434                    to_location_id,
1435                    Some(metadata.op.backtrace.clone()),
1436                );
1437
1438                // Extract semantic tags for network edge
1439                let input_metadata = input.metadata();
1440                add_edge_with_metadata(
1441                    structure,
1442                    input_id,
1443                    network_id,
1444                    Some(input_metadata),
1445                    Some(metadata),
1446                    Some(format!("to {:?}", to_location_id)),
1447                );
1448
1449                network_id
1450            }
1451
1452            // Non-deterministic batch operation
1453            HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1454                structure,
1455                seen_tees,
1456                config,
1457                input: inner,
1458                metadata,
1459                op_name: extract_op_name(self.print_root()),
1460                node_type: HydroNodeType::NonDeterministic,
1461            }),
1462
1463            HydroNode::YieldConcat { inner, .. } => {
1464                // Unpersist is typically optimized away, just pass through
1465                inner.build_graph_structure(structure, seen_tees, config)
1466            }
1467
1468            HydroNode::BeginAtomic { inner, .. } => {
1469                inner.build_graph_structure(structure, seen_tees, config)
1470            }
1471
1472            HydroNode::EndAtomic { inner, .. } => {
1473                inner.build_graph_structure(structure, seen_tees, config)
1474            }
1475
1476            HydroNode::Chain {
1477                first,
1478                second,
1479                metadata,
1480            } => {
1481                let first_id = first.build_graph_structure(structure, seen_tees, config);
1482                let second_id = second.build_graph_structure(structure, seen_tees, config);
1483                let location_id = setup_location(structure, metadata);
1484                let chain_id = structure.add_node_with_backtrace(
1485                    NodeLabel::Static(extract_op_name(self.print_root())),
1486                    HydroNodeType::Transform,
1487                    location_id,
1488                    Some(metadata.op.backtrace.clone()),
1489                );
1490
1491                // Extract semantic tags for first edge
1492                let first_metadata = first.metadata();
1493                add_edge_with_metadata(
1494                    structure,
1495                    first_id,
1496                    chain_id,
1497                    Some(first_metadata),
1498                    Some(metadata),
1499                    Some("first".to_string()),
1500                );
1501
1502                // Extract semantic tags for second edge
1503                let second_metadata = second.metadata();
1504                add_edge_with_metadata(
1505                    structure,
1506                    second_id,
1507                    chain_id,
1508                    Some(second_metadata),
1509                    Some(metadata),
1510                    Some("second".to_string()),
1511                );
1512
1513                chain_id
1514            }
1515
1516            HydroNode::ChainFirst {
1517                first,
1518                second,
1519                metadata,
1520            } => {
1521                let first_id = first.build_graph_structure(structure, seen_tees, config);
1522                let second_id = second.build_graph_structure(structure, seen_tees, config);
1523                let location_id = setup_location(structure, metadata);
1524                let chain_id = structure.add_node_with_backtrace(
1525                    NodeLabel::Static(extract_op_name(self.print_root())),
1526                    HydroNodeType::Transform,
1527                    location_id,
1528                    Some(metadata.op.backtrace.clone()),
1529                );
1530
1531                // Extract semantic tags for first edge
1532                let first_metadata = first.metadata();
1533                add_edge_with_metadata(
1534                    structure,
1535                    first_id,
1536                    chain_id,
1537                    Some(first_metadata),
1538                    Some(metadata),
1539                    Some("first".to_string()),
1540                );
1541
1542                // Extract semantic tags for second edge
1543                let second_metadata = second.metadata();
1544                add_edge_with_metadata(
1545                    structure,
1546                    second_id,
1547                    chain_id,
1548                    Some(second_metadata),
1549                    Some(metadata),
1550                    Some("second".to_string()),
1551                );
1552
1553                chain_id
1554            }
1555
1556            HydroNode::Counter {
1557                tag: _,
1558                prefix: _,
1559                duration,
1560                input,
1561                metadata,
1562            } => build_single_expr_transform(
1563                TransformParams {
1564                    structure,
1565                    seen_tees,
1566                    config,
1567                    input,
1568                    metadata,
1569                    op_name: extract_op_name(self.print_root()),
1570                    node_type: HydroNodeType::Transform,
1571                },
1572                duration,
1573            ),
1574        }
1575    }
1576}
1577
1578/// Utility functions for rendering multiple roots as a single graph.
1579/// Macro to reduce duplication in render functions.
1580macro_rules! render_hydro_ir {
1581    ($name:ident, $write_fn:ident) => {
1582        pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1583            let mut output = String::new();
1584            $write_fn(&mut output, roots, config).unwrap();
1585            output
1586        }
1587    };
1588}
1589
1590/// Macro to reduce duplication in write functions.
1591macro_rules! write_hydro_ir {
1592    ($name:ident, $writer_type:ty, $constructor:expr) => {
1593        pub fn $name(
1594            output: impl std::fmt::Write,
1595            roots: &[HydroRoot],
1596            config: &HydroWriteConfig,
1597        ) -> std::fmt::Result {
1598            let mut graph_write: $writer_type = $constructor(output, config);
1599            write_hydro_ir_graph(&mut graph_write, roots, config)
1600        }
1601    };
1602}
1603
1604render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1605write_hydro_ir!(
1606    write_hydro_ir_mermaid,
1607    HydroMermaid<_>,
1608    HydroMermaid::new_with_config
1609);
1610
1611render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1612write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1613
1614// Legacy hydroscope function - now uses HydroJson for consistency
1615render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1616
1617// JSON rendering
1618render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1619write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1620
1621fn write_hydro_ir_graph<W>(
1622    graph_write: W,
1623    roots: &[HydroRoot],
1624    config: &HydroWriteConfig,
1625) -> Result<(), W::Err>
1626where
1627    W: HydroGraphWrite,
1628{
1629    let mut structure = HydroGraphStructure::new();
1630    let mut seen_tees = HashMap::new();
1631
1632    // Build the graph structure for all roots
1633    for leaf in roots {
1634        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1635    }
1636
1637    write_graph_structure(&structure, graph_write, config)
1638}