hydro_lang/graph/
render.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8// Re-export specific implementations
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12
13/// Label for a graph node - can be either a static string or contain expressions.
14#[derive(Debug, Clone)]
15pub enum NodeLabel {
16    /// A static string label
17    Static(String),
18    /// A label with an operation name and expression arguments
19    WithExprs {
20        op_name: String,
21        exprs: Vec<DebugExpr>,
22    },
23}
24
25impl NodeLabel {
26    /// Create a static label
27    pub fn static_label(s: String) -> Self {
28        Self::Static(s)
29    }
30
31    /// Create a label for an operation with multiple expression
32    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33        Self::WithExprs { op_name, exprs }
34    }
35}
36
37impl std::fmt::Display for NodeLabel {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Static(s) => write!(f, "{}", s),
41            Self::WithExprs { op_name, exprs } => {
42                if exprs.is_empty() {
43                    write!(f, "{}()", op_name)
44                } else {
45                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46                    write!(f, "{}({})", op_name, expr_strs.join(", "))
47                }
48            }
49        }
50    }
51}
52
53/// Base struct for text-based graph writers that use indentation.
54/// Contains common fields shared by DOT and Mermaid writers.
55pub struct IndentedGraphWriter<W> {
56    pub write: W,
57    pub indent: usize,
58    pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62    /// Create a new writer with default configuration.
63    pub fn new(write: W) -> Self {
64        Self {
65            write,
66            indent: 0,
67            config: HydroWriteConfig::default(),
68        }
69    }
70
71    /// Create a new writer with the given configuration.
72    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73        Self {
74            write,
75            indent: 0,
76            config: config.clone(),
77        }
78    }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82    /// Write an indented line using the current indentation level.
83    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85    }
86}
87
88/// Common error type used by all graph writers.
89pub type GraphWriteError = std::fmt::Error;
90
91/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
92#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94    /// Error type emitted by writing.
95    type Err: Error;
96
97    /// Begin the graph. First method called.
98    fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100    /// Write a node definition with styling.
101    fn write_node_definition(
102        &mut self,
103        node_id: usize,
104        node_label: &NodeLabel,
105        node_type: HydroNodeType,
106        location_id: Option<usize>,
107        location_type: Option<&str>,
108    ) -> Result<(), Self::Err>;
109
110    /// Write an edge between nodes with optional labeling.
111    fn write_edge(
112        &mut self,
113        src_id: usize,
114        dst_id: usize,
115        edge_type: HydroEdgeType,
116        label: Option<&str>,
117    ) -> Result<(), Self::Err>;
118
119    /// Begin writing a location grouping (process/cluster).
120    fn write_location_start(
121        &mut self,
122        location_id: usize,
123        location_type: &str,
124    ) -> Result<(), Self::Err>;
125
126    /// Write a node within a location.
127    fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129    /// End writing a location grouping.
130    fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132    /// End the graph. Last method called.
133    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136/// Types of nodes in Hydro IR for styling purposes.
137#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139    Source,
140    Transform,
141    Join,
142    Aggregation,
143    Network,
144    Sink,
145    Tee,
146}
147
148/// Types of edges in Hydro IR.
149#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151    Stream,
152    Persistent,
153    Network,
154    Cycle,
155}
156
157/// Configuration for graph writing.
158#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160    pub show_metadata: bool,
161    pub show_location_groups: bool,
162    pub use_short_labels: bool,
163    pub process_id_name: Vec<(usize, String)>,
164    pub cluster_id_name: Vec<(usize, String)>,
165    pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169    fn default() -> Self {
170        Self {
171            show_metadata: false,
172            show_location_groups: true,
173            use_short_labels: true, // Default to short labels for all renderers
174            process_id_name: vec![],
175            cluster_id_name: vec![],
176            external_id_name: vec![],
177        }
178    }
179}
180
181/// Graph structure tracker for Hydro IR rendering.
182#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184    pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, /* node_id -> (label, type, location) */
185    pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, // (src, dst, edge_type, label)
186    pub locations: HashMap<usize, String>,                         // location_id -> location_type
187    pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    pub fn add_node(
196        &mut self,
197        label: NodeLabel,
198        node_type: HydroNodeType,
199        location: Option<usize>,
200    ) -> usize {
201        let node_id = self.next_node_id;
202        self.next_node_id += 1;
203        self.nodes.insert(node_id, (label, node_type, location));
204        node_id
205    }
206
207    pub fn add_edge(
208        &mut self,
209        src: usize,
210        dst: usize,
211        edge_type: HydroEdgeType,
212        label: Option<String>,
213    ) {
214        self.edges.push((src, dst, edge_type, label));
215    }
216
217    pub fn add_location(&mut self, location_id: usize, location_type: String) {
218        self.locations.insert(location_id, location_type);
219    }
220}
221
222/// Function to extract an op_name from a print_root() result for use in labels.
223pub fn extract_op_name(full_label: String) -> String {
224    full_label
225        .split('(')
226        .next()
227        .unwrap_or("unknown")
228        .to_string()
229        .to_lowercase()
230}
231
232/// Extract a short, readable label from the full token stream label using print_root() style naming
233pub fn extract_short_label(full_label: &str) -> String {
234    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
235    if let Some(op_name) = full_label.split('(').next() {
236        let base_name = op_name.to_lowercase();
237        match base_name.as_str() {
238            // Handle special cases for UI display
239            "source" => {
240                if full_label.contains("Iter") {
241                    "source_iter".to_string()
242                } else if full_label.contains("Stream") {
243                    "source_stream".to_string()
244                } else if full_label.contains("ExternalNetwork") {
245                    "external_network".to_string()
246                } else if full_label.contains("Spin") {
247                    "spin".to_string()
248                } else {
249                    "source".to_string()
250                }
251            }
252            "network" => {
253                if full_label.contains("deser") {
254                    "network(recv)".to_string()
255                } else if full_label.contains("ser") {
256                    "network(send)".to_string()
257                } else {
258                    "network".to_string()
259                }
260            }
261            // For all other cases, just use the lowercase base name (same as extract_op_name)
262            _ => base_name,
263        }
264    } else {
265        // Fallback for labels that don't follow the pattern
266        if full_label.len() > 20 {
267            format!("{}...", &full_label[..17])
268        } else {
269            full_label.to_string()
270        }
271    }
272}
273
274/// Helper function to extract location ID and type from metadata.
275fn extract_location_id(metadata: &crate::ir::HydroIrMetadata) -> (Option<usize>, Option<String>) {
276    use crate::location::LocationId;
277    match &metadata.location_kind {
278        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280        LocationId::Tick(_, inner) => match inner.as_ref() {
281            LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
282            LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
283            _ => (None, None),
284        },
285    }
286}
287
288/// Helper function to set up location in structure from metadata.
289fn setup_location(
290    structure: &mut HydroGraphStructure,
291    metadata: &crate::ir::HydroIrMetadata,
292) -> Option<usize> {
293    let (location_id, location_type) = extract_location_id(metadata);
294    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
295        structure.add_location(loc_id, loc_type);
296    }
297    location_id
298}
299
300impl HydroRoot {
301    /// Core graph writing logic that works with any GraphWrite implementation.
302    pub fn write_graph<W>(
303        &self,
304        mut graph_write: W,
305        config: &HydroWriteConfig,
306    ) -> Result<(), W::Err>
307    where
308        W: HydroGraphWrite,
309    {
310        let mut structure = HydroGraphStructure::new();
311        let mut seen_tees = HashMap::new();
312
313        // Build the graph structure by traversing the IR
314        let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
315
316        // Write the graph
317        graph_write.write_prologue()?;
318
319        // Write node definitions
320        for (&node_id, (label, node_type, location)) in &structure.nodes {
321            let (location_id, location_type) = if let Some(loc_id) = location {
322                (
323                    Some(*loc_id),
324                    structure.locations.get(loc_id).map(|s| s.as_str()),
325                )
326            } else {
327                (None, None)
328            };
329
330            // Check if this is a label that came from an expression-containing operation
331            // We can detect this by looking for the pattern "op_name(...)" and checking if we have the original expressions
332            graph_write.write_node_definition(
333                node_id,
334                label,
335                *node_type,
336                location_id,
337                location_type,
338            )?;
339        }
340
341        // Group nodes by location if requested
342        if config.show_location_groups {
343            let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
344            for (&node_id, (_, _, location)) in &structure.nodes {
345                if let Some(location_id) = location {
346                    nodes_by_location
347                        .entry(*location_id)
348                        .or_default()
349                        .push(node_id);
350                }
351            }
352
353            for (&location_id, node_ids) in &nodes_by_location {
354                if let Some(location_type) = structure.locations.get(&location_id) {
355                    graph_write.write_location_start(location_id, location_type)?;
356                    for &node_id in node_ids {
357                        graph_write.write_node(node_id)?;
358                    }
359                    graph_write.write_location_end()?;
360                }
361            }
362        }
363
364        // Write edges
365        for (src_id, dst_id, edge_type, label) in &structure.edges {
366            graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
367        }
368
369        graph_write.write_epilogue()?;
370        Ok(())
371    }
372
373    /// Build the graph structure by traversing the IR tree.
374    pub fn build_graph_structure(
375        &self,
376        structure: &mut HydroGraphStructure,
377        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
378        config: &HydroWriteConfig,
379    ) -> usize {
380        // Helper function for sink nodes to reduce duplication
381        fn build_sink_node(
382            structure: &mut HydroGraphStructure,
383            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
384            config: &HydroWriteConfig,
385            input: &HydroNode,
386            metadata: Option<&crate::ir::HydroIrMetadata>,
387            label: NodeLabel,
388            edge_type: HydroEdgeType,
389        ) -> usize {
390            let input_id = input.build_graph_structure(structure, seen_tees, config);
391            let location_id = metadata.and_then(|m| setup_location(structure, m));
392            let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
393            structure.add_edge(input_id, sink_id, edge_type, None);
394            sink_id
395        }
396
397        match self {
398            // Sink operations with Stream edges - grouped by edge type
399            HydroRoot::ForEach { f, input, .. } => build_sink_node(
400                structure,
401                seen_tees,
402                config,
403                input,
404                None,
405                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
406                HydroEdgeType::Stream,
407            ),
408
409            HydroRoot::SendExternal {
410                to_external_id,
411                to_key,
412                input,
413                ..
414            } => build_sink_node(
415                structure,
416                seen_tees,
417                config,
418                input,
419                None,
420                NodeLabel::with_exprs(
421                    format!("send_external({}:{})", to_external_id, to_key),
422                    vec![],
423                ),
424                HydroEdgeType::Stream,
425            ),
426
427            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
428                structure,
429                seen_tees,
430                config,
431                input,
432                None,
433                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
434                HydroEdgeType::Stream,
435            ),
436
437            // Sink operation with Cycle edge - grouped by edge type
438            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
439                structure,
440                seen_tees,
441                config,
442                input,
443                None,
444                NodeLabel::static_label(format!("cycle_sink({})", ident)),
445                HydroEdgeType::Cycle,
446            ),
447        }
448    }
449}
450
451impl HydroNode {
452    /// Build the graph structure recursively for this node.
453    pub fn build_graph_structure(
454        &self,
455        structure: &mut HydroGraphStructure,
456        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
457        config: &HydroWriteConfig,
458    ) -> usize {
459        use crate::location::LocationId;
460
461        // Helper functions to reduce duplication, categorized by input/expression patterns
462
463        /// Common parameters for transform builder functions to reduce argument count
464        struct TransformParams<'a> {
465            structure: &'a mut HydroGraphStructure,
466            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
467            config: &'a HydroWriteConfig,
468            input: &'a HydroNode,
469            metadata: &'a crate::ir::HydroIrMetadata,
470            op_name: String,
471            node_type: HydroNodeType,
472            edge_type: HydroEdgeType,
473        }
474
475        // Single-input transform with no expressions
476        fn build_simple_transform(params: TransformParams) -> usize {
477            let input_id = params.input.build_graph_structure(
478                params.structure,
479                params.seen_tees,
480                params.config,
481            );
482            let location_id = setup_location(params.structure, params.metadata);
483            let node_id = params.structure.add_node(
484                NodeLabel::Static(params.op_name.to_string()),
485                params.node_type,
486                location_id,
487            );
488            params
489                .structure
490                .add_edge(input_id, node_id, params.edge_type, None);
491            node_id
492        }
493
494        // Single-input transform with one expression
495        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
496            let input_id = params.input.build_graph_structure(
497                params.structure,
498                params.seen_tees,
499                params.config,
500            );
501            let location_id = setup_location(params.structure, params.metadata);
502            let node_id = params.structure.add_node(
503                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
504                params.node_type,
505                location_id,
506            );
507            params
508                .structure
509                .add_edge(input_id, node_id, params.edge_type, None);
510            node_id
511        }
512
513        // Single-input transform with two expressions
514        fn build_dual_expr_transform(
515            params: TransformParams,
516            expr1: &DebugExpr,
517            expr2: &DebugExpr,
518        ) -> usize {
519            let input_id = params.input.build_graph_structure(
520                params.structure,
521                params.seen_tees,
522                params.config,
523            );
524            let location_id = setup_location(params.structure, params.metadata);
525            let node_id = params.structure.add_node(
526                NodeLabel::with_exprs(
527                    params.op_name.to_string(),
528                    vec![expr1.clone(), expr2.clone()],
529                ),
530                params.node_type,
531                location_id,
532            );
533            params
534                .structure
535                .add_edge(input_id, node_id, params.edge_type, None);
536            node_id
537        }
538
539        // Helper function for source nodes
540        fn build_source_node(
541            structure: &mut HydroGraphStructure,
542            metadata: &crate::ir::HydroIrMetadata,
543            label: String,
544        ) -> usize {
545            let location_id = setup_location(structure, metadata);
546            structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
547        }
548
549        match self {
550            HydroNode::Placeholder => structure.add_node(
551                NodeLabel::Static("PLACEHOLDER".to_string()),
552                HydroNodeType::Transform,
553                None,
554            ),
555
556            HydroNode::Source {
557                source, metadata, ..
558            } => {
559                let label = match source {
560                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
561                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
562                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
563                    HydroSource::Spin() => "spin()".to_string(),
564                };
565                build_source_node(structure, metadata, label)
566            }
567
568            HydroNode::ExternalInput {
569                from_external_id,
570                from_key,
571                metadata,
572                ..
573            } => build_source_node(
574                structure,
575                metadata,
576                format!("external_input({}:{})", from_external_id, from_key),
577            ),
578
579            HydroNode::CycleSource {
580                ident, metadata, ..
581            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
582
583            HydroNode::Tee { inner, metadata } => {
584                let ptr = inner.as_ptr();
585                if let Some(&existing_id) = seen_tees.get(&ptr) {
586                    return existing_id;
587                }
588
589                let input_id = inner
590                    .0
591                    .borrow()
592                    .build_graph_structure(structure, seen_tees, config);
593                let location_id = setup_location(structure, metadata);
594
595                let tee_id = structure.add_node(
596                    NodeLabel::Static(extract_op_name(self.print_root())),
597                    HydroNodeType::Tee,
598                    location_id,
599                );
600
601                seen_tees.insert(ptr, tee_id);
602
603                structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
604
605                tee_id
606            }
607
608            // Transform operations with Stream edges - grouped by node/edge type
609            HydroNode::Delta { inner, metadata }
610            | HydroNode::DeferTick {
611                input: inner,
612                metadata,
613            }
614            | HydroNode::Enumerate {
615                input: inner,
616                metadata,
617                ..
618            }
619            | HydroNode::Unique {
620                input: inner,
621                metadata,
622            }
623            | HydroNode::ResolveFutures {
624                input: inner,
625                metadata,
626            }
627            | HydroNode::ResolveFuturesOrdered {
628                input: inner,
629                metadata,
630            } => build_simple_transform(TransformParams {
631                structure,
632                seen_tees,
633                config,
634                input: inner,
635                metadata,
636                op_name: extract_op_name(self.print_root()),
637                node_type: HydroNodeType::Transform,
638                edge_type: HydroEdgeType::Stream,
639            }),
640
641            // Transform operation with Persistent edge - grouped by node/edge type
642            HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
643                structure,
644                seen_tees,
645                config,
646                input: inner,
647                metadata,
648                op_name: extract_op_name(self.print_root()),
649                node_type: HydroNodeType::Transform,
650                edge_type: HydroEdgeType::Persistent,
651            }),
652
653            // Aggregation operation with Stream edge - grouped by node/edge type
654            HydroNode::Sort {
655                input: inner,
656                metadata,
657            } => build_simple_transform(TransformParams {
658                structure,
659                seen_tees,
660                config,
661                input: inner,
662                metadata,
663                op_name: extract_op_name(self.print_root()),
664                node_type: HydroNodeType::Aggregation,
665                edge_type: HydroEdgeType::Stream,
666            }),
667
668            // Single-expression Transform operations - grouped by node type
669            HydroNode::Map { f, input, metadata }
670            | HydroNode::Filter { f, input, metadata }
671            | HydroNode::FlatMap { f, input, metadata }
672            | HydroNode::FilterMap { f, input, metadata }
673            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
674                TransformParams {
675                    structure,
676                    seen_tees,
677                    config,
678                    input,
679                    metadata,
680                    op_name: extract_op_name(self.print_root()),
681                    node_type: HydroNodeType::Transform,
682                    edge_type: HydroEdgeType::Stream,
683                },
684                f,
685            ),
686
687            // Single-expression Aggregation operations - grouped by node type
688            HydroNode::Reduce { f, input, metadata }
689            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
690                TransformParams {
691                    structure,
692                    seen_tees,
693                    config,
694                    input,
695                    metadata,
696                    op_name: extract_op_name(self.print_root()),
697                    node_type: HydroNodeType::Aggregation,
698                    edge_type: HydroEdgeType::Stream,
699                },
700                f,
701            ),
702
703            // Join-like operations with left/right edge labels - grouped by edge labeling
704            HydroNode::Join {
705                left,
706                right,
707                metadata,
708            }
709            | HydroNode::CrossProduct {
710                left,
711                right,
712                metadata,
713            }
714            | HydroNode::CrossSingleton {
715                left,
716                right,
717                metadata,
718            } => {
719                let left_id = left.build_graph_structure(structure, seen_tees, config);
720                let right_id = right.build_graph_structure(structure, seen_tees, config);
721                let location_id = setup_location(structure, metadata);
722                let node_id = structure.add_node(
723                    NodeLabel::Static(extract_op_name(self.print_root())),
724                    HydroNodeType::Join,
725                    location_id,
726                );
727                structure.add_edge(
728                    left_id,
729                    node_id,
730                    HydroEdgeType::Stream,
731                    Some("left".to_string()),
732                );
733                structure.add_edge(
734                    right_id,
735                    node_id,
736                    HydroEdgeType::Stream,
737                    Some("right".to_string()),
738                );
739                node_id
740            }
741
742            // Join-like operations with pos/neg edge labels - grouped by edge labeling
743            HydroNode::Difference {
744                pos: left,
745                neg: right,
746                metadata,
747            }
748            | HydroNode::AntiJoin {
749                pos: left,
750                neg: right,
751                metadata,
752            } => {
753                let left_id = left.build_graph_structure(structure, seen_tees, config);
754                let right_id = right.build_graph_structure(structure, seen_tees, config);
755                let location_id = setup_location(structure, metadata);
756                let node_id = structure.add_node(
757                    NodeLabel::Static(extract_op_name(self.print_root())),
758                    HydroNodeType::Join,
759                    location_id,
760                );
761                structure.add_edge(
762                    left_id,
763                    node_id,
764                    HydroEdgeType::Stream,
765                    Some("pos".to_string()),
766                );
767                structure.add_edge(
768                    right_id,
769                    node_id,
770                    HydroEdgeType::Stream,
771                    Some("neg".to_string()),
772                );
773                node_id
774            }
775
776            // Dual expression transforms - consolidated using pattern matching
777            HydroNode::Fold {
778                init,
779                acc,
780                input,
781                metadata,
782            }
783            | HydroNode::FoldKeyed {
784                init,
785                acc,
786                input,
787                metadata,
788            }
789            | HydroNode::Scan {
790                init,
791                acc,
792                input,
793                metadata,
794            } => {
795                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
796
797                build_dual_expr_transform(
798                    TransformParams {
799                        structure,
800                        seen_tees,
801                        config,
802                        input,
803                        metadata,
804                        op_name: extract_op_name(self.print_root()),
805                        node_type,
806                        edge_type: HydroEdgeType::Stream,
807                    },
808                    init,
809                    acc,
810                )
811            }
812
813            // Combination of join and transform
814            HydroNode::ReduceKeyedWatermark {
815                f,
816                input,
817                watermark,
818                metadata,
819            } => {
820                let input_id = input.build_graph_structure(structure, seen_tees, config);
821                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
822                let location_id = setup_location(structure, metadata);
823                let join_node_id = structure.add_node(
824                    NodeLabel::Static(extract_op_name(self.print_root())),
825                    HydroNodeType::Join,
826                    location_id,
827                );
828                structure.add_edge(
829                    input_id,
830                    join_node_id,
831                    HydroEdgeType::Stream,
832                    Some("input".to_string()),
833                );
834                structure.add_edge(
835                    watermark_id,
836                    join_node_id,
837                    HydroEdgeType::Stream,
838                    Some("watermark".to_string()),
839                );
840
841                let node_id = structure.add_node(
842                    NodeLabel::with_exprs(
843                        extract_op_name(self.print_root()).to_string(),
844                        vec![f.clone()],
845                    ),
846                    HydroNodeType::Aggregation,
847                    location_id,
848                );
849                structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
850                node_id
851            }
852
853            HydroNode::Network {
854                serialize_fn,
855                deserialize_fn,
856                input,
857                metadata,
858                ..
859            } => {
860                let input_id = input.build_graph_structure(structure, seen_tees, config);
861                let _from_location_id = setup_location(structure, metadata);
862
863                let to_location_id = match metadata.location_kind.root() {
864                    LocationId::Process(id) => {
865                        structure.add_location(*id, "Process".to_string());
866                        Some(*id)
867                    }
868                    LocationId::Cluster(id) => {
869                        structure.add_location(*id, "Cluster".to_string());
870                        Some(*id)
871                    }
872                    _ => None,
873                };
874
875                let mut label = "network(".to_string();
876                if serialize_fn.is_some() {
877                    label.push_str("ser");
878                }
879                if deserialize_fn.is_some() {
880                    if serialize_fn.is_some() {
881                        label.push_str(" + ");
882                    }
883                    label.push_str("deser");
884                }
885                label.push(')');
886
887                let network_id = structure.add_node(
888                    NodeLabel::Static(label),
889                    HydroNodeType::Network,
890                    to_location_id,
891                );
892                structure.add_edge(
893                    input_id,
894                    network_id,
895                    HydroEdgeType::Network,
896                    Some(format!("to {:?}", to_location_id)),
897                );
898                network_id
899            }
900
901            // Handle remaining node types
902            HydroNode::Unpersist { inner, .. } => {
903                // Unpersist is typically optimized away, just pass through
904                inner.build_graph_structure(structure, seen_tees, config)
905            }
906
907            HydroNode::Chain {
908                first,
909                second,
910                metadata,
911            } => {
912                let first_id = first.build_graph_structure(structure, seen_tees, config);
913                let second_id = second.build_graph_structure(structure, seen_tees, config);
914                let location_id = setup_location(structure, metadata);
915                let chain_id = structure.add_node(
916                    NodeLabel::Static(extract_op_name(self.print_root())),
917                    HydroNodeType::Transform,
918                    location_id,
919                );
920                structure.add_edge(
921                    first_id,
922                    chain_id,
923                    HydroEdgeType::Stream,
924                    Some("first".to_string()),
925                );
926                structure.add_edge(
927                    second_id,
928                    chain_id,
929                    HydroEdgeType::Stream,
930                    Some("second".to_string()),
931                );
932                chain_id
933            }
934
935            HydroNode::Counter {
936                tag: _,
937                duration,
938                input,
939                metadata,
940            } => build_single_expr_transform(
941                TransformParams {
942                    structure,
943                    seen_tees,
944                    config,
945                    input,
946                    metadata,
947                    op_name: extract_op_name(self.print_root()),
948                    node_type: HydroNodeType::Transform,
949                    edge_type: HydroEdgeType::Stream,
950                },
951                duration,
952            ),
953        }
954    }
955}
956
957/// Utility functions for rendering multiple roots as a single graph.
958/// Macro to reduce duplication in render functions.
959macro_rules! render_hydro_ir {
960    ($name:ident, $write_fn:ident) => {
961        pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
962            let mut output = String::new();
963            $write_fn(&mut output, roots, config).unwrap();
964            output
965        }
966    };
967}
968
969/// Macro to reduce duplication in write functions.
970macro_rules! write_hydro_ir {
971    ($name:ident, $writer_type:ty, $constructor:expr) => {
972        pub fn $name(
973            output: impl std::fmt::Write,
974            roots: &[HydroRoot],
975            config: &HydroWriteConfig,
976        ) -> std::fmt::Result {
977            let mut graph_write: $writer_type = $constructor(output, config);
978            write_hydro_ir_graph(&mut graph_write, roots, config)
979        }
980    };
981}
982
983render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
984write_hydro_ir!(
985    write_hydro_ir_mermaid,
986    HydroMermaid<_>,
987    HydroMermaid::new_with_config
988);
989
990render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
991write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
992
993render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
994write_hydro_ir!(
995    write_hydro_ir_reactflow,
996    HydroReactFlow<_>,
997    HydroReactFlow::new
998);
999
1000fn write_hydro_ir_graph<W>(
1001    mut graph_write: W,
1002    roots: &[HydroRoot],
1003    config: &HydroWriteConfig,
1004) -> Result<(), W::Err>
1005where
1006    W: HydroGraphWrite,
1007{
1008    let mut structure = HydroGraphStructure::new();
1009    let mut seen_tees = HashMap::new();
1010
1011    // Build the graph structure for all roots
1012    for leaf in roots {
1013        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1014    }
1015
1016    // Write the graph using the same logic as individual roots
1017    graph_write.write_prologue()?;
1018
1019    for (&node_id, (label, node_type, location)) in &structure.nodes {
1020        let (location_id, location_type) = if let Some(loc_id) = location {
1021            (
1022                Some(*loc_id),
1023                structure.locations.get(loc_id).map(|s| s.as_str()),
1024            )
1025        } else {
1026            (None, None)
1027        };
1028        graph_write.write_node_definition(
1029            node_id,
1030            label,
1031            *node_type,
1032            location_id,
1033            location_type,
1034        )?;
1035    }
1036
1037    if config.show_location_groups {
1038        let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1039        for (&node_id, (_, _, location)) in &structure.nodes {
1040            if let Some(location_id) = location {
1041                nodes_by_location
1042                    .entry(*location_id)
1043                    .or_default()
1044                    .push(node_id);
1045            }
1046        }
1047
1048        for (&location_id, node_ids) in &nodes_by_location {
1049            if let Some(location_type) = structure.locations.get(&location_id) {
1050                graph_write.write_location_start(location_id, location_type)?;
1051                for &node_id in node_ids {
1052                    graph_write.write_node(node_id)?;
1053                }
1054                graph_write.write_location_end()?;
1055            }
1056        }
1057    }
1058
1059    for (src_id, dst_id, edge_type, label) in &structure.edges {
1060        graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1061    }
1062
1063    graph_write.write_epilogue()?;
1064    Ok(())
1065}