1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12
13#[derive(Debug, Clone)]
15pub enum NodeLabel {
16 Static(String),
18 WithExprs {
20 op_name: String,
21 exprs: Vec<DebugExpr>,
22 },
23}
24
25impl NodeLabel {
26 pub fn static_label(s: String) -> Self {
28 Self::Static(s)
29 }
30
31 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33 Self::WithExprs { op_name, exprs }
34 }
35}
36
37impl std::fmt::Display for NodeLabel {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Static(s) => write!(f, "{}", s),
41 Self::WithExprs { op_name, exprs } => {
42 if exprs.is_empty() {
43 write!(f, "{}()", op_name)
44 } else {
45 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46 write!(f, "{}({})", op_name, expr_strs.join(", "))
47 }
48 }
49 }
50 }
51}
52
53pub struct IndentedGraphWriter<W> {
56 pub write: W,
57 pub indent: usize,
58 pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62 pub fn new(write: W) -> Self {
64 Self {
65 write,
66 indent: 0,
67 config: HydroWriteConfig::default(),
68 }
69 }
70
71 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73 Self {
74 write,
75 indent: 0,
76 config: config.clone(),
77 }
78 }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85 }
86}
87
88pub type GraphWriteError = std::fmt::Error;
90
91#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94 type Err: Error;
96
97 fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100 fn write_node_definition(
102 &mut self,
103 node_id: usize,
104 node_label: &NodeLabel,
105 node_type: HydroNodeType,
106 location_id: Option<usize>,
107 location_type: Option<&str>,
108 ) -> Result<(), Self::Err>;
109
110 fn write_edge(
112 &mut self,
113 src_id: usize,
114 dst_id: usize,
115 edge_type: HydroEdgeType,
116 label: Option<&str>,
117 ) -> Result<(), Self::Err>;
118
119 fn write_location_start(
121 &mut self,
122 location_id: usize,
123 location_type: &str,
124 ) -> Result<(), Self::Err>;
125
126 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129 fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139 Source,
140 Transform,
141 Join,
142 Aggregation,
143 Network,
144 Sink,
145 Tee,
146}
147
148#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151 Stream,
152 Persistent,
153 Network,
154 Cycle,
155}
156
157#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160 pub show_metadata: bool,
161 pub show_location_groups: bool,
162 pub use_short_labels: bool,
163 pub process_id_name: Vec<(usize, String)>,
164 pub cluster_id_name: Vec<(usize, String)>,
165 pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169 fn default() -> Self {
170 Self {
171 show_metadata: false,
172 show_location_groups: true,
173 use_short_labels: true, process_id_name: vec![],
175 cluster_id_name: vec![],
176 external_id_name: vec![],
177 }
178 }
179}
180
181#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184 pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, pub locations: HashMap<usize, String>, pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn add_node(
196 &mut self,
197 label: NodeLabel,
198 node_type: HydroNodeType,
199 location: Option<usize>,
200 ) -> usize {
201 let node_id = self.next_node_id;
202 self.next_node_id += 1;
203 self.nodes.insert(node_id, (label, node_type, location));
204 node_id
205 }
206
207 pub fn add_edge(
208 &mut self,
209 src: usize,
210 dst: usize,
211 edge_type: HydroEdgeType,
212 label: Option<String>,
213 ) {
214 self.edges.push((src, dst, edge_type, label));
215 }
216
217 pub fn add_location(&mut self, location_id: usize, location_type: String) {
218 self.locations.insert(location_id, location_type);
219 }
220}
221
222pub fn extract_op_name(full_label: String) -> String {
224 full_label
225 .split('(')
226 .next()
227 .unwrap_or("unknown")
228 .to_string()
229 .to_lowercase()
230}
231
232pub fn extract_short_label(full_label: &str) -> String {
234 if let Some(op_name) = full_label.split('(').next() {
236 let base_name = op_name.to_lowercase();
237 match base_name.as_str() {
238 "source" => {
240 if full_label.contains("Iter") {
241 "source_iter".to_string()
242 } else if full_label.contains("Stream") {
243 "source_stream".to_string()
244 } else if full_label.contains("ExternalNetwork") {
245 "external_network".to_string()
246 } else if full_label.contains("Spin") {
247 "spin".to_string()
248 } else {
249 "source".to_string()
250 }
251 }
252 "network" => {
253 if full_label.contains("deser") {
254 "network(recv)".to_string()
255 } else if full_label.contains("ser") {
256 "network(send)".to_string()
257 } else {
258 "network".to_string()
259 }
260 }
261 _ => base_name,
263 }
264 } else {
265 if full_label.len() > 20 {
267 format!("{}...", &full_label[..17])
268 } else {
269 full_label.to_string()
270 }
271 }
272}
273
274fn extract_location_id(metadata: &crate::ir::HydroIrMetadata) -> (Option<usize>, Option<String>) {
276 use crate::location::LocationId;
277 match &metadata.location_kind {
278 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280 LocationId::Tick(_, inner) => match inner.as_ref() {
281 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
282 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
283 _ => (None, None),
284 },
285 }
286}
287
288fn setup_location(
290 structure: &mut HydroGraphStructure,
291 metadata: &crate::ir::HydroIrMetadata,
292) -> Option<usize> {
293 let (location_id, location_type) = extract_location_id(metadata);
294 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
295 structure.add_location(loc_id, loc_type);
296 }
297 location_id
298}
299
300impl HydroRoot {
301 pub fn write_graph<W>(
303 &self,
304 mut graph_write: W,
305 config: &HydroWriteConfig,
306 ) -> Result<(), W::Err>
307 where
308 W: HydroGraphWrite,
309 {
310 let mut structure = HydroGraphStructure::new();
311 let mut seen_tees = HashMap::new();
312
313 let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
315
316 graph_write.write_prologue()?;
318
319 for (&node_id, (label, node_type, location)) in &structure.nodes {
321 let (location_id, location_type) = if let Some(loc_id) = location {
322 (
323 Some(*loc_id),
324 structure.locations.get(loc_id).map(|s| s.as_str()),
325 )
326 } else {
327 (None, None)
328 };
329
330 graph_write.write_node_definition(
333 node_id,
334 label,
335 *node_type,
336 location_id,
337 location_type,
338 )?;
339 }
340
341 if config.show_location_groups {
343 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
344 for (&node_id, (_, _, location)) in &structure.nodes {
345 if let Some(location_id) = location {
346 nodes_by_location
347 .entry(*location_id)
348 .or_default()
349 .push(node_id);
350 }
351 }
352
353 for (&location_id, node_ids) in &nodes_by_location {
354 if let Some(location_type) = structure.locations.get(&location_id) {
355 graph_write.write_location_start(location_id, location_type)?;
356 for &node_id in node_ids {
357 graph_write.write_node(node_id)?;
358 }
359 graph_write.write_location_end()?;
360 }
361 }
362 }
363
364 for (src_id, dst_id, edge_type, label) in &structure.edges {
366 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
367 }
368
369 graph_write.write_epilogue()?;
370 Ok(())
371 }
372
373 pub fn build_graph_structure(
375 &self,
376 structure: &mut HydroGraphStructure,
377 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
378 config: &HydroWriteConfig,
379 ) -> usize {
380 fn build_sink_node(
382 structure: &mut HydroGraphStructure,
383 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
384 config: &HydroWriteConfig,
385 input: &HydroNode,
386 metadata: Option<&crate::ir::HydroIrMetadata>,
387 label: NodeLabel,
388 edge_type: HydroEdgeType,
389 ) -> usize {
390 let input_id = input.build_graph_structure(structure, seen_tees, config);
391 let location_id = metadata.and_then(|m| setup_location(structure, m));
392 let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
393 structure.add_edge(input_id, sink_id, edge_type, None);
394 sink_id
395 }
396
397 match self {
398 HydroRoot::ForEach { f, input, .. } => build_sink_node(
400 structure,
401 seen_tees,
402 config,
403 input,
404 None,
405 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
406 HydroEdgeType::Stream,
407 ),
408
409 HydroRoot::SendExternal {
410 to_external_id,
411 to_key,
412 input,
413 ..
414 } => build_sink_node(
415 structure,
416 seen_tees,
417 config,
418 input,
419 None,
420 NodeLabel::with_exprs(
421 format!("send_external({}:{})", to_external_id, to_key),
422 vec![],
423 ),
424 HydroEdgeType::Stream,
425 ),
426
427 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
428 structure,
429 seen_tees,
430 config,
431 input,
432 None,
433 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
434 HydroEdgeType::Stream,
435 ),
436
437 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
439 structure,
440 seen_tees,
441 config,
442 input,
443 None,
444 NodeLabel::static_label(format!("cycle_sink({})", ident)),
445 HydroEdgeType::Cycle,
446 ),
447 }
448 }
449}
450
451impl HydroNode {
452 pub fn build_graph_structure(
454 &self,
455 structure: &mut HydroGraphStructure,
456 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
457 config: &HydroWriteConfig,
458 ) -> usize {
459 use crate::location::LocationId;
460
461 struct TransformParams<'a> {
465 structure: &'a mut HydroGraphStructure,
466 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
467 config: &'a HydroWriteConfig,
468 input: &'a HydroNode,
469 metadata: &'a crate::ir::HydroIrMetadata,
470 op_name: String,
471 node_type: HydroNodeType,
472 edge_type: HydroEdgeType,
473 }
474
475 fn build_simple_transform(params: TransformParams) -> usize {
477 let input_id = params.input.build_graph_structure(
478 params.structure,
479 params.seen_tees,
480 params.config,
481 );
482 let location_id = setup_location(params.structure, params.metadata);
483 let node_id = params.structure.add_node(
484 NodeLabel::Static(params.op_name.to_string()),
485 params.node_type,
486 location_id,
487 );
488 params
489 .structure
490 .add_edge(input_id, node_id, params.edge_type, None);
491 node_id
492 }
493
494 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
496 let input_id = params.input.build_graph_structure(
497 params.structure,
498 params.seen_tees,
499 params.config,
500 );
501 let location_id = setup_location(params.structure, params.metadata);
502 let node_id = params.structure.add_node(
503 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
504 params.node_type,
505 location_id,
506 );
507 params
508 .structure
509 .add_edge(input_id, node_id, params.edge_type, None);
510 node_id
511 }
512
513 fn build_dual_expr_transform(
515 params: TransformParams,
516 expr1: &DebugExpr,
517 expr2: &DebugExpr,
518 ) -> usize {
519 let input_id = params.input.build_graph_structure(
520 params.structure,
521 params.seen_tees,
522 params.config,
523 );
524 let location_id = setup_location(params.structure, params.metadata);
525 let node_id = params.structure.add_node(
526 NodeLabel::with_exprs(
527 params.op_name.to_string(),
528 vec![expr1.clone(), expr2.clone()],
529 ),
530 params.node_type,
531 location_id,
532 );
533 params
534 .structure
535 .add_edge(input_id, node_id, params.edge_type, None);
536 node_id
537 }
538
539 fn build_source_node(
541 structure: &mut HydroGraphStructure,
542 metadata: &crate::ir::HydroIrMetadata,
543 label: String,
544 ) -> usize {
545 let location_id = setup_location(structure, metadata);
546 structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
547 }
548
549 match self {
550 HydroNode::Placeholder => structure.add_node(
551 NodeLabel::Static("PLACEHOLDER".to_string()),
552 HydroNodeType::Transform,
553 None,
554 ),
555
556 HydroNode::Source {
557 source, metadata, ..
558 } => {
559 let label = match source {
560 HydroSource::Stream(expr) => format!("source_stream({})", expr),
561 HydroSource::ExternalNetwork() => "external_network()".to_string(),
562 HydroSource::Iter(expr) => format!("source_iter({})", expr),
563 HydroSource::Spin() => "spin()".to_string(),
564 };
565 build_source_node(structure, metadata, label)
566 }
567
568 HydroNode::ExternalInput {
569 from_external_id,
570 from_key,
571 metadata,
572 ..
573 } => build_source_node(
574 structure,
575 metadata,
576 format!("external_input({}:{})", from_external_id, from_key),
577 ),
578
579 HydroNode::CycleSource {
580 ident, metadata, ..
581 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
582
583 HydroNode::Tee { inner, metadata } => {
584 let ptr = inner.as_ptr();
585 if let Some(&existing_id) = seen_tees.get(&ptr) {
586 return existing_id;
587 }
588
589 let input_id = inner
590 .0
591 .borrow()
592 .build_graph_structure(structure, seen_tees, config);
593 let location_id = setup_location(structure, metadata);
594
595 let tee_id = structure.add_node(
596 NodeLabel::Static(extract_op_name(self.print_root())),
597 HydroNodeType::Tee,
598 location_id,
599 );
600
601 seen_tees.insert(ptr, tee_id);
602
603 structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
604
605 tee_id
606 }
607
608 HydroNode::Delta { inner, metadata }
610 | HydroNode::DeferTick {
611 input: inner,
612 metadata,
613 }
614 | HydroNode::Enumerate {
615 input: inner,
616 metadata,
617 ..
618 }
619 | HydroNode::Unique {
620 input: inner,
621 metadata,
622 }
623 | HydroNode::ResolveFutures {
624 input: inner,
625 metadata,
626 }
627 | HydroNode::ResolveFuturesOrdered {
628 input: inner,
629 metadata,
630 } => build_simple_transform(TransformParams {
631 structure,
632 seen_tees,
633 config,
634 input: inner,
635 metadata,
636 op_name: extract_op_name(self.print_root()),
637 node_type: HydroNodeType::Transform,
638 edge_type: HydroEdgeType::Stream,
639 }),
640
641 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
643 structure,
644 seen_tees,
645 config,
646 input: inner,
647 metadata,
648 op_name: extract_op_name(self.print_root()),
649 node_type: HydroNodeType::Transform,
650 edge_type: HydroEdgeType::Persistent,
651 }),
652
653 HydroNode::Sort {
655 input: inner,
656 metadata,
657 } => build_simple_transform(TransformParams {
658 structure,
659 seen_tees,
660 config,
661 input: inner,
662 metadata,
663 op_name: extract_op_name(self.print_root()),
664 node_type: HydroNodeType::Aggregation,
665 edge_type: HydroEdgeType::Stream,
666 }),
667
668 HydroNode::Map { f, input, metadata }
670 | HydroNode::Filter { f, input, metadata }
671 | HydroNode::FlatMap { f, input, metadata }
672 | HydroNode::FilterMap { f, input, metadata }
673 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
674 TransformParams {
675 structure,
676 seen_tees,
677 config,
678 input,
679 metadata,
680 op_name: extract_op_name(self.print_root()),
681 node_type: HydroNodeType::Transform,
682 edge_type: HydroEdgeType::Stream,
683 },
684 f,
685 ),
686
687 HydroNode::Reduce { f, input, metadata }
689 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
690 TransformParams {
691 structure,
692 seen_tees,
693 config,
694 input,
695 metadata,
696 op_name: extract_op_name(self.print_root()),
697 node_type: HydroNodeType::Aggregation,
698 edge_type: HydroEdgeType::Stream,
699 },
700 f,
701 ),
702
703 HydroNode::Join {
705 left,
706 right,
707 metadata,
708 }
709 | HydroNode::CrossProduct {
710 left,
711 right,
712 metadata,
713 }
714 | HydroNode::CrossSingleton {
715 left,
716 right,
717 metadata,
718 } => {
719 let left_id = left.build_graph_structure(structure, seen_tees, config);
720 let right_id = right.build_graph_structure(structure, seen_tees, config);
721 let location_id = setup_location(structure, metadata);
722 let node_id = structure.add_node(
723 NodeLabel::Static(extract_op_name(self.print_root())),
724 HydroNodeType::Join,
725 location_id,
726 );
727 structure.add_edge(
728 left_id,
729 node_id,
730 HydroEdgeType::Stream,
731 Some("left".to_string()),
732 );
733 structure.add_edge(
734 right_id,
735 node_id,
736 HydroEdgeType::Stream,
737 Some("right".to_string()),
738 );
739 node_id
740 }
741
742 HydroNode::Difference {
744 pos: left,
745 neg: right,
746 metadata,
747 }
748 | HydroNode::AntiJoin {
749 pos: left,
750 neg: right,
751 metadata,
752 } => {
753 let left_id = left.build_graph_structure(structure, seen_tees, config);
754 let right_id = right.build_graph_structure(structure, seen_tees, config);
755 let location_id = setup_location(structure, metadata);
756 let node_id = structure.add_node(
757 NodeLabel::Static(extract_op_name(self.print_root())),
758 HydroNodeType::Join,
759 location_id,
760 );
761 structure.add_edge(
762 left_id,
763 node_id,
764 HydroEdgeType::Stream,
765 Some("pos".to_string()),
766 );
767 structure.add_edge(
768 right_id,
769 node_id,
770 HydroEdgeType::Stream,
771 Some("neg".to_string()),
772 );
773 node_id
774 }
775
776 HydroNode::Fold {
778 init,
779 acc,
780 input,
781 metadata,
782 }
783 | HydroNode::FoldKeyed {
784 init,
785 acc,
786 input,
787 metadata,
788 }
789 | HydroNode::Scan {
790 init,
791 acc,
792 input,
793 metadata,
794 } => {
795 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
798 TransformParams {
799 structure,
800 seen_tees,
801 config,
802 input,
803 metadata,
804 op_name: extract_op_name(self.print_root()),
805 node_type,
806 edge_type: HydroEdgeType::Stream,
807 },
808 init,
809 acc,
810 )
811 }
812
813 HydroNode::ReduceKeyedWatermark {
815 f,
816 input,
817 watermark,
818 metadata,
819 } => {
820 let input_id = input.build_graph_structure(structure, seen_tees, config);
821 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
822 let location_id = setup_location(structure, metadata);
823 let join_node_id = structure.add_node(
824 NodeLabel::Static(extract_op_name(self.print_root())),
825 HydroNodeType::Join,
826 location_id,
827 );
828 structure.add_edge(
829 input_id,
830 join_node_id,
831 HydroEdgeType::Stream,
832 Some("input".to_string()),
833 );
834 structure.add_edge(
835 watermark_id,
836 join_node_id,
837 HydroEdgeType::Stream,
838 Some("watermark".to_string()),
839 );
840
841 let node_id = structure.add_node(
842 NodeLabel::with_exprs(
843 extract_op_name(self.print_root()).to_string(),
844 vec![f.clone()],
845 ),
846 HydroNodeType::Aggregation,
847 location_id,
848 );
849 structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
850 node_id
851 }
852
853 HydroNode::Network {
854 serialize_fn,
855 deserialize_fn,
856 input,
857 metadata,
858 ..
859 } => {
860 let input_id = input.build_graph_structure(structure, seen_tees, config);
861 let _from_location_id = setup_location(structure, metadata);
862
863 let to_location_id = match metadata.location_kind.root() {
864 LocationId::Process(id) => {
865 structure.add_location(*id, "Process".to_string());
866 Some(*id)
867 }
868 LocationId::Cluster(id) => {
869 structure.add_location(*id, "Cluster".to_string());
870 Some(*id)
871 }
872 _ => None,
873 };
874
875 let mut label = "network(".to_string();
876 if serialize_fn.is_some() {
877 label.push_str("ser");
878 }
879 if deserialize_fn.is_some() {
880 if serialize_fn.is_some() {
881 label.push_str(" + ");
882 }
883 label.push_str("deser");
884 }
885 label.push(')');
886
887 let network_id = structure.add_node(
888 NodeLabel::Static(label),
889 HydroNodeType::Network,
890 to_location_id,
891 );
892 structure.add_edge(
893 input_id,
894 network_id,
895 HydroEdgeType::Network,
896 Some(format!("to {:?}", to_location_id)),
897 );
898 network_id
899 }
900
901 HydroNode::Unpersist { inner, .. } => {
903 inner.build_graph_structure(structure, seen_tees, config)
905 }
906
907 HydroNode::Chain {
908 first,
909 second,
910 metadata,
911 } => {
912 let first_id = first.build_graph_structure(structure, seen_tees, config);
913 let second_id = second.build_graph_structure(structure, seen_tees, config);
914 let location_id = setup_location(structure, metadata);
915 let chain_id = structure.add_node(
916 NodeLabel::Static(extract_op_name(self.print_root())),
917 HydroNodeType::Transform,
918 location_id,
919 );
920 structure.add_edge(
921 first_id,
922 chain_id,
923 HydroEdgeType::Stream,
924 Some("first".to_string()),
925 );
926 structure.add_edge(
927 second_id,
928 chain_id,
929 HydroEdgeType::Stream,
930 Some("second".to_string()),
931 );
932 chain_id
933 }
934
935 HydroNode::Counter {
936 tag: _,
937 duration,
938 input,
939 metadata,
940 } => build_single_expr_transform(
941 TransformParams {
942 structure,
943 seen_tees,
944 config,
945 input,
946 metadata,
947 op_name: extract_op_name(self.print_root()),
948 node_type: HydroNodeType::Transform,
949 edge_type: HydroEdgeType::Stream,
950 },
951 duration,
952 ),
953 }
954 }
955}
956
957macro_rules! render_hydro_ir {
960 ($name:ident, $write_fn:ident) => {
961 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
962 let mut output = String::new();
963 $write_fn(&mut output, roots, config).unwrap();
964 output
965 }
966 };
967}
968
969macro_rules! write_hydro_ir {
971 ($name:ident, $writer_type:ty, $constructor:expr) => {
972 pub fn $name(
973 output: impl std::fmt::Write,
974 roots: &[HydroRoot],
975 config: &HydroWriteConfig,
976 ) -> std::fmt::Result {
977 let mut graph_write: $writer_type = $constructor(output, config);
978 write_hydro_ir_graph(&mut graph_write, roots, config)
979 }
980 };
981}
982
983render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
984write_hydro_ir!(
985 write_hydro_ir_mermaid,
986 HydroMermaid<_>,
987 HydroMermaid::new_with_config
988);
989
990render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
991write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
992
993render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
994write_hydro_ir!(
995 write_hydro_ir_reactflow,
996 HydroReactFlow<_>,
997 HydroReactFlow::new
998);
999
1000fn write_hydro_ir_graph<W>(
1001 mut graph_write: W,
1002 roots: &[HydroRoot],
1003 config: &HydroWriteConfig,
1004) -> Result<(), W::Err>
1005where
1006 W: HydroGraphWrite,
1007{
1008 let mut structure = HydroGraphStructure::new();
1009 let mut seen_tees = HashMap::new();
1010
1011 for leaf in roots {
1013 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1014 }
1015
1016 graph_write.write_prologue()?;
1018
1019 for (&node_id, (label, node_type, location)) in &structure.nodes {
1020 let (location_id, location_type) = if let Some(loc_id) = location {
1021 (
1022 Some(*loc_id),
1023 structure.locations.get(loc_id).map(|s| s.as_str()),
1024 )
1025 } else {
1026 (None, None)
1027 };
1028 graph_write.write_node_definition(
1029 node_id,
1030 label,
1031 *node_type,
1032 location_id,
1033 location_type,
1034 )?;
1035 }
1036
1037 if config.show_location_groups {
1038 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1039 for (&node_id, (_, _, location)) in &structure.nodes {
1040 if let Some(location_id) = location {
1041 nodes_by_location
1042 .entry(*location_id)
1043 .or_default()
1044 .push(node_id);
1045 }
1046 }
1047
1048 for (&location_id, node_ids) in &nodes_by_location {
1049 if let Some(location_type) = structure.locations.get(&location_id) {
1050 graph_write.write_location_start(location_id, location_type)?;
1051 for &node_id in node_ids {
1052 graph_write.write_node(node_id)?;
1053 }
1054 graph_write.write_location_end()?;
1055 }
1056 }
1057 }
1058
1059 for (src_id, dst_id, edge_type, label) in &structure.edges {
1060 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1061 }
1062
1063 graph_write.write_epilogue()?;
1064 Ok(())
1065}