hydro_lang/graph/
render.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8// Re-export specific implementations
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12use crate::location::dynamic::LocationId;
13
14/// Label for a graph node - can be either a static string or contain expressions.
15#[derive(Debug, Clone)]
16pub enum NodeLabel {
17    /// A static string label
18    Static(String),
19    /// A label with an operation name and expression arguments
20    WithExprs {
21        op_name: String,
22        exprs: Vec<DebugExpr>,
23    },
24}
25
26impl NodeLabel {
27    /// Create a static label
28    pub fn static_label(s: String) -> Self {
29        Self::Static(s)
30    }
31
32    /// Create a label for an operation with multiple expression
33    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
34        Self::WithExprs { op_name, exprs }
35    }
36}
37
38impl std::fmt::Display for NodeLabel {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::Static(s) => write!(f, "{}", s),
42            Self::WithExprs { op_name, exprs } => {
43                if exprs.is_empty() {
44                    write!(f, "{}()", op_name)
45                } else {
46                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
47                    write!(f, "{}({})", op_name, expr_strs.join(", "))
48                }
49            }
50        }
51    }
52}
53
54/// Base struct for text-based graph writers that use indentation.
55/// Contains common fields shared by DOT and Mermaid writers.
56pub struct IndentedGraphWriter<W> {
57    pub write: W,
58    pub indent: usize,
59    pub config: HydroWriteConfig,
60}
61
62impl<W> IndentedGraphWriter<W> {
63    /// Create a new writer with default configuration.
64    pub fn new(write: W) -> Self {
65        Self {
66            write,
67            indent: 0,
68            config: HydroWriteConfig::default(),
69        }
70    }
71
72    /// Create a new writer with the given configuration.
73    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
74        Self {
75            write,
76            indent: 0,
77            config: config.clone(),
78        }
79    }
80}
81
82impl<W: Write> IndentedGraphWriter<W> {
83    /// Write an indented line using the current indentation level.
84    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
85        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
86    }
87}
88
89/// Common error type used by all graph writers.
90pub type GraphWriteError = std::fmt::Error;
91
92/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
93#[auto_impl(&mut, Box)]
94pub trait HydroGraphWrite {
95    /// Error type emitted by writing.
96    type Err: Error;
97
98    /// Begin the graph. First method called.
99    fn write_prologue(&mut self) -> Result<(), Self::Err>;
100
101    /// Write a node definition with styling.
102    fn write_node_definition(
103        &mut self,
104        node_id: usize,
105        node_label: &NodeLabel,
106        node_type: HydroNodeType,
107        location_id: Option<usize>,
108        location_type: Option<&str>,
109    ) -> Result<(), Self::Err>;
110
111    /// Write an edge between nodes with optional labeling.
112    fn write_edge(
113        &mut self,
114        src_id: usize,
115        dst_id: usize,
116        edge_type: HydroEdgeType,
117        label: Option<&str>,
118    ) -> Result<(), Self::Err>;
119
120    /// Begin writing a location grouping (process/cluster).
121    fn write_location_start(
122        &mut self,
123        location_id: usize,
124        location_type: &str,
125    ) -> Result<(), Self::Err>;
126
127    /// Write a node within a location.
128    fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
129
130    /// End writing a location grouping.
131    fn write_location_end(&mut self) -> Result<(), Self::Err>;
132
133    /// End the graph. Last method called.
134    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
135}
136
137/// Types of nodes in Hydro IR for styling purposes.
138#[derive(Debug, Clone, Copy)]
139pub enum HydroNodeType {
140    Source,
141    Transform,
142    Join,
143    Aggregation,
144    Network,
145    Sink,
146    Tee,
147}
148
149/// Types of edges in Hydro IR.
150#[derive(Debug, Clone, Copy)]
151pub enum HydroEdgeType {
152    Stream,
153    Persistent,
154    Network,
155    Cycle,
156}
157
158/// Configuration for graph writing.
159#[derive(Debug, Clone)]
160pub struct HydroWriteConfig {
161    pub show_metadata: bool,
162    pub show_location_groups: bool,
163    pub use_short_labels: bool,
164    pub process_id_name: Vec<(usize, String)>,
165    pub cluster_id_name: Vec<(usize, String)>,
166    pub external_id_name: Vec<(usize, String)>,
167}
168
169impl Default for HydroWriteConfig {
170    fn default() -> Self {
171        Self {
172            show_metadata: false,
173            show_location_groups: true,
174            use_short_labels: true, // Default to short labels for all renderers
175            process_id_name: vec![],
176            cluster_id_name: vec![],
177            external_id_name: vec![],
178        }
179    }
180}
181
182/// Graph structure tracker for Hydro IR rendering.
183#[derive(Debug, Default)]
184pub struct HydroGraphStructure {
185    pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, /* node_id -> (label, type, location) */
186    pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, // (src, dst, edge_type, label)
187    pub locations: HashMap<usize, String>,                         // location_id -> location_type
188    pub next_node_id: usize,
189}
190
191impl HydroGraphStructure {
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    pub fn add_node(
197        &mut self,
198        label: NodeLabel,
199        node_type: HydroNodeType,
200        location: Option<usize>,
201    ) -> usize {
202        let node_id = self.next_node_id;
203        self.next_node_id += 1;
204        self.nodes.insert(node_id, (label, node_type, location));
205        node_id
206    }
207
208    pub fn add_edge(
209        &mut self,
210        src: usize,
211        dst: usize,
212        edge_type: HydroEdgeType,
213        label: Option<String>,
214    ) {
215        self.edges.push((src, dst, edge_type, label));
216    }
217
218    pub fn add_location(&mut self, location_id: usize, location_type: String) {
219        self.locations.insert(location_id, location_type);
220    }
221}
222
223/// Function to extract an op_name from a print_root() result for use in labels.
224pub fn extract_op_name(full_label: String) -> String {
225    full_label
226        .split('(')
227        .next()
228        .unwrap_or("unknown")
229        .to_string()
230        .to_lowercase()
231}
232
233/// Extract a short, readable label from the full token stream label using print_root() style naming
234pub fn extract_short_label(full_label: &str) -> String {
235    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
236    if let Some(op_name) = full_label.split('(').next() {
237        let base_name = op_name.to_lowercase();
238        match base_name.as_str() {
239            // Handle special cases for UI display
240            "source" => {
241                if full_label.contains("Iter") {
242                    "source_iter".to_string()
243                } else if full_label.contains("Stream") {
244                    "source_stream".to_string()
245                } else if full_label.contains("ExternalNetwork") {
246                    "external_network".to_string()
247                } else if full_label.contains("Spin") {
248                    "spin".to_string()
249                } else {
250                    "source".to_string()
251                }
252            }
253            "network" => {
254                if full_label.contains("deser") {
255                    "network(recv)".to_string()
256                } else if full_label.contains("ser") {
257                    "network(send)".to_string()
258                } else {
259                    "network".to_string()
260                }
261            }
262            // For all other cases, just use the lowercase base name (same as extract_op_name)
263            _ => base_name,
264        }
265    } else {
266        // Fallback for labels that don't follow the pattern
267        if full_label.len() > 20 {
268            format!("{}...", &full_label[..17])
269        } else {
270            full_label.to_string()
271        }
272    }
273}
274
275/// Helper function to extract location ID and type from metadata.
276fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
277    match location_id.root() {
278        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280        _ => panic!("unexpected location type"),
281    }
282}
283
284/// Helper function to set up location in structure from metadata.
285fn setup_location(
286    structure: &mut HydroGraphStructure,
287    metadata: &crate::compile::ir::HydroIrMetadata,
288) -> Option<usize> {
289    let (location_id, location_type) = extract_location_id(&metadata.location_kind);
290    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
291        structure.add_location(loc_id, loc_type);
292    }
293    location_id
294}
295
296impl HydroRoot {
297    /// Core graph writing logic that works with any GraphWrite implementation.
298    pub fn write_graph<W>(
299        &self,
300        mut graph_write: W,
301        config: &HydroWriteConfig,
302    ) -> Result<(), W::Err>
303    where
304        W: HydroGraphWrite,
305    {
306        let mut structure = HydroGraphStructure::new();
307        let mut seen_tees = HashMap::new();
308
309        // Build the graph structure by traversing the IR
310        let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
311
312        // Write the graph
313        graph_write.write_prologue()?;
314
315        // Write node definitions
316        for (&node_id, (label, node_type, location)) in &structure.nodes {
317            let (location_id, location_type) = if let Some(loc_id) = location {
318                (
319                    Some(*loc_id),
320                    structure.locations.get(loc_id).map(|s| s.as_str()),
321                )
322            } else {
323                (None, None)
324            };
325
326            // Check if this is a label that came from an expression-containing operation
327            // We can detect this by looking for the pattern "op_name(...)" and checking if we have the original expressions
328            graph_write.write_node_definition(
329                node_id,
330                label,
331                *node_type,
332                location_id,
333                location_type,
334            )?;
335        }
336
337        // Group nodes by location if requested
338        if config.show_location_groups {
339            let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
340            for (&node_id, (_, _, location)) in &structure.nodes {
341                if let Some(location_id) = location {
342                    nodes_by_location
343                        .entry(*location_id)
344                        .or_default()
345                        .push(node_id);
346                }
347            }
348
349            for (&location_id, node_ids) in &nodes_by_location {
350                if let Some(location_type) = structure.locations.get(&location_id) {
351                    graph_write.write_location_start(location_id, location_type)?;
352                    for &node_id in node_ids {
353                        graph_write.write_node(node_id)?;
354                    }
355                    graph_write.write_location_end()?;
356                }
357            }
358        }
359
360        // Write edges
361        for (src_id, dst_id, edge_type, label) in &structure.edges {
362            graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
363        }
364
365        graph_write.write_epilogue()?;
366        Ok(())
367    }
368
369    /// Build the graph structure by traversing the IR tree.
370    pub fn build_graph_structure(
371        &self,
372        structure: &mut HydroGraphStructure,
373        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
374        config: &HydroWriteConfig,
375    ) -> usize {
376        // Helper function for sink nodes to reduce duplication
377        fn build_sink_node(
378            structure: &mut HydroGraphStructure,
379            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380            config: &HydroWriteConfig,
381            input: &HydroNode,
382            metadata: Option<&crate::compile::ir::HydroIrMetadata>,
383            label: NodeLabel,
384            edge_type: HydroEdgeType,
385        ) -> usize {
386            let input_id = input.build_graph_structure(structure, seen_tees, config);
387            let location_id = metadata.and_then(|m| setup_location(structure, m));
388            let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
389            structure.add_edge(input_id, sink_id, edge_type, None);
390            sink_id
391        }
392
393        match self {
394            // Sink operations with Stream edges - grouped by edge type
395            HydroRoot::ForEach { f, input, .. } => build_sink_node(
396                structure,
397                seen_tees,
398                config,
399                input,
400                None,
401                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
402                HydroEdgeType::Stream,
403            ),
404
405            HydroRoot::SendExternal {
406                to_external_id,
407                to_key,
408                input,
409                ..
410            } => build_sink_node(
411                structure,
412                seen_tees,
413                config,
414                input,
415                None,
416                NodeLabel::with_exprs(
417                    format!("send_external({}:{})", to_external_id, to_key),
418                    vec![],
419                ),
420                HydroEdgeType::Stream,
421            ),
422
423            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
424                structure,
425                seen_tees,
426                config,
427                input,
428                None,
429                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
430                HydroEdgeType::Stream,
431            ),
432
433            // Sink operation with Cycle edge - grouped by edge type
434            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
435                structure,
436                seen_tees,
437                config,
438                input,
439                None,
440                NodeLabel::static_label(format!("cycle_sink({})", ident)),
441                HydroEdgeType::Cycle,
442            ),
443        }
444    }
445}
446
447impl HydroNode {
448    /// Build the graph structure recursively for this node.
449    pub fn build_graph_structure(
450        &self,
451        structure: &mut HydroGraphStructure,
452        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
453        config: &HydroWriteConfig,
454    ) -> usize {
455        use crate::location::dynamic::LocationId;
456
457        // Helper functions to reduce duplication, categorized by input/expression patterns
458
459        /// Common parameters for transform builder functions to reduce argument count
460        struct TransformParams<'a> {
461            structure: &'a mut HydroGraphStructure,
462            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
463            config: &'a HydroWriteConfig,
464            input: &'a HydroNode,
465            metadata: &'a crate::compile::ir::HydroIrMetadata,
466            op_name: String,
467            node_type: HydroNodeType,
468            edge_type: HydroEdgeType,
469        }
470
471        // Single-input transform with no expressions
472        fn build_simple_transform(params: TransformParams) -> usize {
473            let input_id = params.input.build_graph_structure(
474                params.structure,
475                params.seen_tees,
476                params.config,
477            );
478            let location_id = setup_location(params.structure, params.metadata);
479            let node_id = params.structure.add_node(
480                NodeLabel::Static(params.op_name.to_string()),
481                params.node_type,
482                location_id,
483            );
484            params
485                .structure
486                .add_edge(input_id, node_id, params.edge_type, None);
487            node_id
488        }
489
490        // Single-input transform with one expression
491        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
492            let input_id = params.input.build_graph_structure(
493                params.structure,
494                params.seen_tees,
495                params.config,
496            );
497            let location_id = setup_location(params.structure, params.metadata);
498            let node_id = params.structure.add_node(
499                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
500                params.node_type,
501                location_id,
502            );
503            params
504                .structure
505                .add_edge(input_id, node_id, params.edge_type, None);
506            node_id
507        }
508
509        // Single-input transform with two expressions
510        fn build_dual_expr_transform(
511            params: TransformParams,
512            expr1: &DebugExpr,
513            expr2: &DebugExpr,
514        ) -> usize {
515            let input_id = params.input.build_graph_structure(
516                params.structure,
517                params.seen_tees,
518                params.config,
519            );
520            let location_id = setup_location(params.structure, params.metadata);
521            let node_id = params.structure.add_node(
522                NodeLabel::with_exprs(
523                    params.op_name.to_string(),
524                    vec![expr1.clone(), expr2.clone()],
525                ),
526                params.node_type,
527                location_id,
528            );
529            params
530                .structure
531                .add_edge(input_id, node_id, params.edge_type, None);
532            node_id
533        }
534
535        // Helper function for source nodes
536        fn build_source_node(
537            structure: &mut HydroGraphStructure,
538            metadata: &crate::compile::ir::HydroIrMetadata,
539            label: String,
540        ) -> usize {
541            let location_id = setup_location(structure, metadata);
542            structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
543        }
544
545        match self {
546            HydroNode::Placeholder => structure.add_node(
547                NodeLabel::Static("PLACEHOLDER".to_string()),
548                HydroNodeType::Transform,
549                None,
550            ),
551
552            HydroNode::Source {
553                source, metadata, ..
554            } => {
555                let label = match source {
556                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
557                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
558                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
559                    HydroSource::Spin() => "spin()".to_string(),
560                };
561                build_source_node(structure, metadata, label)
562            }
563
564            HydroNode::ExternalInput {
565                from_external_id,
566                from_key,
567                metadata,
568                ..
569            } => build_source_node(
570                structure,
571                metadata,
572                format!("external_input({}:{})", from_external_id, from_key),
573            ),
574
575            HydroNode::CycleSource {
576                ident, metadata, ..
577            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
578
579            HydroNode::Tee { inner, metadata } => {
580                let ptr = inner.as_ptr();
581                if let Some(&existing_id) = seen_tees.get(&ptr) {
582                    return existing_id;
583                }
584
585                let input_id = inner
586                    .0
587                    .borrow()
588                    .build_graph_structure(structure, seen_tees, config);
589                let location_id = setup_location(structure, metadata);
590
591                let tee_id = structure.add_node(
592                    NodeLabel::Static(extract_op_name(self.print_root())),
593                    HydroNodeType::Tee,
594                    location_id,
595                );
596
597                seen_tees.insert(ptr, tee_id);
598
599                structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
600
601                tee_id
602            }
603
604            // Transform operations with Stream edges - grouped by node/edge type
605            HydroNode::Cast { inner, metadata }
606            | HydroNode::ObserveNonDet { inner, metadata }
607            | HydroNode::DeferTick {
608                input: inner,
609                metadata,
610            }
611            | HydroNode::Enumerate {
612                input: inner,
613                metadata,
614                ..
615            }
616            | HydroNode::Unique {
617                input: inner,
618                metadata,
619            }
620            | HydroNode::ResolveFutures {
621                input: inner,
622                metadata,
623            }
624            | HydroNode::ResolveFuturesOrdered {
625                input: inner,
626                metadata,
627            } => build_simple_transform(TransformParams {
628                structure,
629                seen_tees,
630                config,
631                input: inner,
632                metadata,
633                op_name: extract_op_name(self.print_root()),
634                node_type: HydroNodeType::Transform,
635                edge_type: HydroEdgeType::Stream,
636            }),
637
638            // Transform operation with Persistent edge - grouped by node/edge type
639            HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
640                structure,
641                seen_tees,
642                config,
643                input: inner,
644                metadata,
645                op_name: extract_op_name(self.print_root()),
646                node_type: HydroNodeType::Transform,
647                edge_type: HydroEdgeType::Persistent,
648            }),
649
650            // Aggregation operation with Stream edge - grouped by node/edge type
651            HydroNode::Sort {
652                input: inner,
653                metadata,
654            } => build_simple_transform(TransformParams {
655                structure,
656                seen_tees,
657                config,
658                input: inner,
659                metadata,
660                op_name: extract_op_name(self.print_root()),
661                node_type: HydroNodeType::Aggregation,
662                edge_type: HydroEdgeType::Stream,
663            }),
664
665            // Single-expression Transform operations - grouped by node type
666            HydroNode::Map { f, input, metadata }
667            | HydroNode::Filter { f, input, metadata }
668            | HydroNode::FlatMap { f, input, metadata }
669            | HydroNode::FilterMap { f, input, metadata }
670            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
671                TransformParams {
672                    structure,
673                    seen_tees,
674                    config,
675                    input,
676                    metadata,
677                    op_name: extract_op_name(self.print_root()),
678                    node_type: HydroNodeType::Transform,
679                    edge_type: HydroEdgeType::Stream,
680                },
681                f,
682            ),
683
684            // Single-expression Aggregation operations - grouped by node type
685            HydroNode::Reduce { f, input, metadata }
686            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
687                TransformParams {
688                    structure,
689                    seen_tees,
690                    config,
691                    input,
692                    metadata,
693                    op_name: extract_op_name(self.print_root()),
694                    node_type: HydroNodeType::Aggregation,
695                    edge_type: HydroEdgeType::Stream,
696                },
697                f,
698            ),
699
700            // Join-like operations with left/right edge labels - grouped by edge labeling
701            HydroNode::Join {
702                left,
703                right,
704                metadata,
705            }
706            | HydroNode::CrossProduct {
707                left,
708                right,
709                metadata,
710            }
711            | HydroNode::CrossSingleton {
712                left,
713                right,
714                metadata,
715            } => {
716                let left_id = left.build_graph_structure(structure, seen_tees, config);
717                let right_id = right.build_graph_structure(structure, seen_tees, config);
718                let location_id = setup_location(structure, metadata);
719                let node_id = structure.add_node(
720                    NodeLabel::Static(extract_op_name(self.print_root())),
721                    HydroNodeType::Join,
722                    location_id,
723                );
724                structure.add_edge(
725                    left_id,
726                    node_id,
727                    HydroEdgeType::Stream,
728                    Some("left".to_string()),
729                );
730                structure.add_edge(
731                    right_id,
732                    node_id,
733                    HydroEdgeType::Stream,
734                    Some("right".to_string()),
735                );
736                node_id
737            }
738
739            // Join-like operations with pos/neg edge labels - grouped by edge labeling
740            HydroNode::Difference {
741                pos: left,
742                neg: right,
743                metadata,
744            }
745            | HydroNode::AntiJoin {
746                pos: left,
747                neg: right,
748                metadata,
749            } => {
750                let left_id = left.build_graph_structure(structure, seen_tees, config);
751                let right_id = right.build_graph_structure(structure, seen_tees, config);
752                let location_id = setup_location(structure, metadata);
753                let node_id = structure.add_node(
754                    NodeLabel::Static(extract_op_name(self.print_root())),
755                    HydroNodeType::Join,
756                    location_id,
757                );
758                structure.add_edge(
759                    left_id,
760                    node_id,
761                    HydroEdgeType::Stream,
762                    Some("pos".to_string()),
763                );
764                structure.add_edge(
765                    right_id,
766                    node_id,
767                    HydroEdgeType::Stream,
768                    Some("neg".to_string()),
769                );
770                node_id
771            }
772
773            // Dual expression transforms - consolidated using pattern matching
774            HydroNode::Fold {
775                init,
776                acc,
777                input,
778                metadata,
779            }
780            | HydroNode::FoldKeyed {
781                init,
782                acc,
783                input,
784                metadata,
785            }
786            | HydroNode::Scan {
787                init,
788                acc,
789                input,
790                metadata,
791            } => {
792                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
793
794                build_dual_expr_transform(
795                    TransformParams {
796                        structure,
797                        seen_tees,
798                        config,
799                        input,
800                        metadata,
801                        op_name: extract_op_name(self.print_root()),
802                        node_type,
803                        edge_type: HydroEdgeType::Stream,
804                    },
805                    init,
806                    acc,
807                )
808            }
809
810            // Combination of join and transform
811            HydroNode::ReduceKeyedWatermark {
812                f,
813                input,
814                watermark,
815                metadata,
816            } => {
817                let input_id = input.build_graph_structure(structure, seen_tees, config);
818                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
819                let location_id = setup_location(structure, metadata);
820                let join_node_id = structure.add_node(
821                    NodeLabel::Static(extract_op_name(self.print_root())),
822                    HydroNodeType::Join,
823                    location_id,
824                );
825                structure.add_edge(
826                    input_id,
827                    join_node_id,
828                    HydroEdgeType::Stream,
829                    Some("input".to_string()),
830                );
831                structure.add_edge(
832                    watermark_id,
833                    join_node_id,
834                    HydroEdgeType::Stream,
835                    Some("watermark".to_string()),
836                );
837
838                let node_id = structure.add_node(
839                    NodeLabel::with_exprs(
840                        extract_op_name(self.print_root()).to_string(),
841                        vec![f.clone()],
842                    ),
843                    HydroNodeType::Aggregation,
844                    location_id,
845                );
846                structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
847                node_id
848            }
849
850            HydroNode::Network {
851                serialize_fn,
852                deserialize_fn,
853                input,
854                metadata,
855                ..
856            } => {
857                let input_id = input.build_graph_structure(structure, seen_tees, config);
858                let _from_location_id = setup_location(structure, metadata);
859
860                let to_location_id = match metadata.location_kind.root() {
861                    LocationId::Process(id) => {
862                        structure.add_location(*id, "Process".to_string());
863                        Some(*id)
864                    }
865                    LocationId::Cluster(id) => {
866                        structure.add_location(*id, "Cluster".to_string());
867                        Some(*id)
868                    }
869                    _ => None,
870                };
871
872                let mut label = "network(".to_string();
873                if serialize_fn.is_some() {
874                    label.push_str("ser");
875                }
876                if deserialize_fn.is_some() {
877                    if serialize_fn.is_some() {
878                        label.push_str(" + ");
879                    }
880                    label.push_str("deser");
881                }
882                label.push(')');
883
884                let network_id = structure.add_node(
885                    NodeLabel::Static(label),
886                    HydroNodeType::Network,
887                    to_location_id,
888                );
889                structure.add_edge(
890                    input_id,
891                    network_id,
892                    HydroEdgeType::Network,
893                    Some(format!("to {:?}", to_location_id)),
894                );
895                network_id
896            }
897
898            // Handle remaining node types
899            HydroNode::Batch { inner, .. } => {
900                // Unpersist is typically optimized away, just pass through
901                inner.build_graph_structure(structure, seen_tees, config)
902            }
903
904            HydroNode::YieldConcat { inner, .. } => {
905                // Unpersist is typically optimized away, just pass through
906                inner.build_graph_structure(structure, seen_tees, config)
907            }
908
909            HydroNode::BeginAtomic { inner, .. } => {
910                inner.build_graph_structure(structure, seen_tees, config)
911            }
912
913            HydroNode::EndAtomic { inner, .. } => {
914                inner.build_graph_structure(structure, seen_tees, config)
915            }
916
917            HydroNode::Chain {
918                first,
919                second,
920                metadata,
921            } => {
922                let first_id = first.build_graph_structure(structure, seen_tees, config);
923                let second_id = second.build_graph_structure(structure, seen_tees, config);
924                let location_id = setup_location(structure, metadata);
925                let chain_id = structure.add_node(
926                    NodeLabel::Static(extract_op_name(self.print_root())),
927                    HydroNodeType::Transform,
928                    location_id,
929                );
930                structure.add_edge(
931                    first_id,
932                    chain_id,
933                    HydroEdgeType::Stream,
934                    Some("first".to_string()),
935                );
936                structure.add_edge(
937                    second_id,
938                    chain_id,
939                    HydroEdgeType::Stream,
940                    Some("second".to_string()),
941                );
942                chain_id
943            }
944
945            HydroNode::ChainFirst {
946                first,
947                second,
948                metadata,
949            } => {
950                let first_id = first.build_graph_structure(structure, seen_tees, config);
951                let second_id = second.build_graph_structure(structure, seen_tees, config);
952                let location_id = setup_location(structure, metadata);
953                let chain_id = structure.add_node(
954                    NodeLabel::Static(extract_op_name(self.print_root())),
955                    HydroNodeType::Transform,
956                    location_id,
957                );
958                structure.add_edge(
959                    first_id,
960                    chain_id,
961                    HydroEdgeType::Stream,
962                    Some("first".to_string()),
963                );
964                structure.add_edge(
965                    second_id,
966                    chain_id,
967                    HydroEdgeType::Stream,
968                    Some("second".to_string()),
969                );
970                chain_id
971            }
972
973            HydroNode::Counter {
974                tag: _,
975                prefix: _,
976                duration,
977                input,
978                metadata,
979            } => build_single_expr_transform(
980                TransformParams {
981                    structure,
982                    seen_tees,
983                    config,
984                    input,
985                    metadata,
986                    op_name: extract_op_name(self.print_root()),
987                    node_type: HydroNodeType::Transform,
988                    edge_type: HydroEdgeType::Stream,
989                },
990                duration,
991            ),
992        }
993    }
994}
995
996/// Utility functions for rendering multiple roots as a single graph.
997/// Macro to reduce duplication in render functions.
998macro_rules! render_hydro_ir {
999    ($name:ident, $write_fn:ident) => {
1000        pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1001            let mut output = String::new();
1002            $write_fn(&mut output, roots, config).unwrap();
1003            output
1004        }
1005    };
1006}
1007
1008/// Macro to reduce duplication in write functions.
1009macro_rules! write_hydro_ir {
1010    ($name:ident, $writer_type:ty, $constructor:expr) => {
1011        pub fn $name(
1012            output: impl std::fmt::Write,
1013            roots: &[HydroRoot],
1014            config: &HydroWriteConfig,
1015        ) -> std::fmt::Result {
1016            let mut graph_write: $writer_type = $constructor(output, config);
1017            write_hydro_ir_graph(&mut graph_write, roots, config)
1018        }
1019    };
1020}
1021
1022render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1023write_hydro_ir!(
1024    write_hydro_ir_mermaid,
1025    HydroMermaid<_>,
1026    HydroMermaid::new_with_config
1027);
1028
1029render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1030write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1031
1032render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1033write_hydro_ir!(
1034    write_hydro_ir_reactflow,
1035    HydroReactFlow<_>,
1036    HydroReactFlow::new
1037);
1038
1039fn write_hydro_ir_graph<W>(
1040    mut graph_write: W,
1041    roots: &[HydroRoot],
1042    config: &HydroWriteConfig,
1043) -> Result<(), W::Err>
1044where
1045    W: HydroGraphWrite,
1046{
1047    let mut structure = HydroGraphStructure::new();
1048    let mut seen_tees = HashMap::new();
1049
1050    // Build the graph structure for all roots
1051    for leaf in roots {
1052        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1053    }
1054
1055    // Write the graph using the same logic as individual roots
1056    graph_write.write_prologue()?;
1057
1058    for (&node_id, (label, node_type, location)) in &structure.nodes {
1059        let (location_id, location_type) = if let Some(loc_id) = location {
1060            (
1061                Some(*loc_id),
1062                structure.locations.get(loc_id).map(|s| s.as_str()),
1063            )
1064        } else {
1065            (None, None)
1066        };
1067        graph_write.write_node_definition(
1068            node_id,
1069            label,
1070            *node_type,
1071            location_id,
1072            location_type,
1073        )?;
1074    }
1075
1076    if config.show_location_groups {
1077        let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1078        for (&node_id, (_, _, location)) in &structure.nodes {
1079            if let Some(location_id) = location {
1080                nodes_by_location
1081                    .entry(*location_id)
1082                    .or_default()
1083                    .push(node_id);
1084            }
1085        }
1086
1087        for (&location_id, node_ids) in &nodes_by_location {
1088            if let Some(location_type) = structure.locations.get(&location_id) {
1089                graph_write.write_location_start(location_id, location_type)?;
1090                for &node_id in node_ids {
1091                    graph_write.write_node(node_id)?;
1092                }
1093                graph_write.write_location_end()?;
1094            }
1095        }
1096    }
1097
1098    for (src_id, dst_id, edge_type, label) in &structure.edges {
1099        graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1100    }
1101
1102    graph_write.write_epilogue()?;
1103    Ok(())
1104}