1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12use crate::location::dynamic::LocationId;
13
14#[derive(Debug, Clone)]
16pub enum NodeLabel {
17 Static(String),
19 WithExprs {
21 op_name: String,
22 exprs: Vec<DebugExpr>,
23 },
24}
25
26impl NodeLabel {
27 pub fn static_label(s: String) -> Self {
29 Self::Static(s)
30 }
31
32 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
34 Self::WithExprs { op_name, exprs }
35 }
36}
37
38impl std::fmt::Display for NodeLabel {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::Static(s) => write!(f, "{}", s),
42 Self::WithExprs { op_name, exprs } => {
43 if exprs.is_empty() {
44 write!(f, "{}()", op_name)
45 } else {
46 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
47 write!(f, "{}({})", op_name, expr_strs.join(", "))
48 }
49 }
50 }
51 }
52}
53
54pub struct IndentedGraphWriter<W> {
57 pub write: W,
58 pub indent: usize,
59 pub config: HydroWriteConfig,
60}
61
62impl<W> IndentedGraphWriter<W> {
63 pub fn new(write: W) -> Self {
65 Self {
66 write,
67 indent: 0,
68 config: HydroWriteConfig::default(),
69 }
70 }
71
72 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
74 Self {
75 write,
76 indent: 0,
77 config: config.clone(),
78 }
79 }
80}
81
82impl<W: Write> IndentedGraphWriter<W> {
83 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
85 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
86 }
87}
88
89pub type GraphWriteError = std::fmt::Error;
91
92#[auto_impl(&mut, Box)]
94pub trait HydroGraphWrite {
95 type Err: Error;
97
98 fn write_prologue(&mut self) -> Result<(), Self::Err>;
100
101 fn write_node_definition(
103 &mut self,
104 node_id: usize,
105 node_label: &NodeLabel,
106 node_type: HydroNodeType,
107 location_id: Option<usize>,
108 location_type: Option<&str>,
109 ) -> Result<(), Self::Err>;
110
111 fn write_edge(
113 &mut self,
114 src_id: usize,
115 dst_id: usize,
116 edge_type: HydroEdgeType,
117 label: Option<&str>,
118 ) -> Result<(), Self::Err>;
119
120 fn write_location_start(
122 &mut self,
123 location_id: usize,
124 location_type: &str,
125 ) -> Result<(), Self::Err>;
126
127 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
129
130 fn write_location_end(&mut self) -> Result<(), Self::Err>;
132
133 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
135}
136
137#[derive(Debug, Clone, Copy)]
139pub enum HydroNodeType {
140 Source,
141 Transform,
142 Join,
143 Aggregation,
144 Network,
145 Sink,
146 Tee,
147}
148
149#[derive(Debug, Clone, Copy)]
151pub enum HydroEdgeType {
152 Stream,
153 Persistent,
154 Network,
155 Cycle,
156}
157
158#[derive(Debug, Clone)]
160pub struct HydroWriteConfig {
161 pub show_metadata: bool,
162 pub show_location_groups: bool,
163 pub use_short_labels: bool,
164 pub process_id_name: Vec<(usize, String)>,
165 pub cluster_id_name: Vec<(usize, String)>,
166 pub external_id_name: Vec<(usize, String)>,
167}
168
169impl Default for HydroWriteConfig {
170 fn default() -> Self {
171 Self {
172 show_metadata: false,
173 show_location_groups: true,
174 use_short_labels: true, process_id_name: vec![],
176 cluster_id_name: vec![],
177 external_id_name: vec![],
178 }
179 }
180}
181
182#[derive(Debug, Default)]
184pub struct HydroGraphStructure {
185 pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, pub locations: HashMap<usize, String>, pub next_node_id: usize,
189}
190
191impl HydroGraphStructure {
192 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn add_node(
197 &mut self,
198 label: NodeLabel,
199 node_type: HydroNodeType,
200 location: Option<usize>,
201 ) -> usize {
202 let node_id = self.next_node_id;
203 self.next_node_id += 1;
204 self.nodes.insert(node_id, (label, node_type, location));
205 node_id
206 }
207
208 pub fn add_edge(
209 &mut self,
210 src: usize,
211 dst: usize,
212 edge_type: HydroEdgeType,
213 label: Option<String>,
214 ) {
215 self.edges.push((src, dst, edge_type, label));
216 }
217
218 pub fn add_location(&mut self, location_id: usize, location_type: String) {
219 self.locations.insert(location_id, location_type);
220 }
221}
222
223pub fn extract_op_name(full_label: String) -> String {
225 full_label
226 .split('(')
227 .next()
228 .unwrap_or("unknown")
229 .to_string()
230 .to_lowercase()
231}
232
233pub fn extract_short_label(full_label: &str) -> String {
235 if let Some(op_name) = full_label.split('(').next() {
237 let base_name = op_name.to_lowercase();
238 match base_name.as_str() {
239 "source" => {
241 if full_label.contains("Iter") {
242 "source_iter".to_string()
243 } else if full_label.contains("Stream") {
244 "source_stream".to_string()
245 } else if full_label.contains("ExternalNetwork") {
246 "external_network".to_string()
247 } else if full_label.contains("Spin") {
248 "spin".to_string()
249 } else {
250 "source".to_string()
251 }
252 }
253 "network" => {
254 if full_label.contains("deser") {
255 "network(recv)".to_string()
256 } else if full_label.contains("ser") {
257 "network(send)".to_string()
258 } else {
259 "network".to_string()
260 }
261 }
262 _ => base_name,
264 }
265 } else {
266 if full_label.len() > 20 {
268 format!("{}...", &full_label[..17])
269 } else {
270 full_label.to_string()
271 }
272 }
273}
274
275fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
277 match location_id.root() {
278 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280 _ => panic!("unexpected location type"),
281 }
282}
283
284fn setup_location(
286 structure: &mut HydroGraphStructure,
287 metadata: &crate::compile::ir::HydroIrMetadata,
288) -> Option<usize> {
289 let (location_id, location_type) = extract_location_id(&metadata.location_kind);
290 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
291 structure.add_location(loc_id, loc_type);
292 }
293 location_id
294}
295
296impl HydroRoot {
297 pub fn write_graph<W>(
299 &self,
300 mut graph_write: W,
301 config: &HydroWriteConfig,
302 ) -> Result<(), W::Err>
303 where
304 W: HydroGraphWrite,
305 {
306 let mut structure = HydroGraphStructure::new();
307 let mut seen_tees = HashMap::new();
308
309 let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
311
312 graph_write.write_prologue()?;
314
315 for (&node_id, (label, node_type, location)) in &structure.nodes {
317 let (location_id, location_type) = if let Some(loc_id) = location {
318 (
319 Some(*loc_id),
320 structure.locations.get(loc_id).map(|s| s.as_str()),
321 )
322 } else {
323 (None, None)
324 };
325
326 graph_write.write_node_definition(
329 node_id,
330 label,
331 *node_type,
332 location_id,
333 location_type,
334 )?;
335 }
336
337 if config.show_location_groups {
339 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
340 for (&node_id, (_, _, location)) in &structure.nodes {
341 if let Some(location_id) = location {
342 nodes_by_location
343 .entry(*location_id)
344 .or_default()
345 .push(node_id);
346 }
347 }
348
349 for (&location_id, node_ids) in &nodes_by_location {
350 if let Some(location_type) = structure.locations.get(&location_id) {
351 graph_write.write_location_start(location_id, location_type)?;
352 for &node_id in node_ids {
353 graph_write.write_node(node_id)?;
354 }
355 graph_write.write_location_end()?;
356 }
357 }
358 }
359
360 for (src_id, dst_id, edge_type, label) in &structure.edges {
362 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
363 }
364
365 graph_write.write_epilogue()?;
366 Ok(())
367 }
368
369 pub fn build_graph_structure(
371 &self,
372 structure: &mut HydroGraphStructure,
373 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
374 config: &HydroWriteConfig,
375 ) -> usize {
376 fn build_sink_node(
378 structure: &mut HydroGraphStructure,
379 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380 config: &HydroWriteConfig,
381 input: &HydroNode,
382 metadata: Option<&crate::compile::ir::HydroIrMetadata>,
383 label: NodeLabel,
384 edge_type: HydroEdgeType,
385 ) -> usize {
386 let input_id = input.build_graph_structure(structure, seen_tees, config);
387 let location_id = metadata.and_then(|m| setup_location(structure, m));
388 let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
389 structure.add_edge(input_id, sink_id, edge_type, None);
390 sink_id
391 }
392
393 match self {
394 HydroRoot::ForEach { f, input, .. } => build_sink_node(
396 structure,
397 seen_tees,
398 config,
399 input,
400 None,
401 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
402 HydroEdgeType::Stream,
403 ),
404
405 HydroRoot::SendExternal {
406 to_external_id,
407 to_key,
408 input,
409 ..
410 } => build_sink_node(
411 structure,
412 seen_tees,
413 config,
414 input,
415 None,
416 NodeLabel::with_exprs(
417 format!("send_external({}:{})", to_external_id, to_key),
418 vec![],
419 ),
420 HydroEdgeType::Stream,
421 ),
422
423 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
424 structure,
425 seen_tees,
426 config,
427 input,
428 None,
429 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
430 HydroEdgeType::Stream,
431 ),
432
433 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
435 structure,
436 seen_tees,
437 config,
438 input,
439 None,
440 NodeLabel::static_label(format!("cycle_sink({})", ident)),
441 HydroEdgeType::Cycle,
442 ),
443 }
444 }
445}
446
447impl HydroNode {
448 pub fn build_graph_structure(
450 &self,
451 structure: &mut HydroGraphStructure,
452 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
453 config: &HydroWriteConfig,
454 ) -> usize {
455 use crate::location::dynamic::LocationId;
456
457 struct TransformParams<'a> {
461 structure: &'a mut HydroGraphStructure,
462 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
463 config: &'a HydroWriteConfig,
464 input: &'a HydroNode,
465 metadata: &'a crate::compile::ir::HydroIrMetadata,
466 op_name: String,
467 node_type: HydroNodeType,
468 edge_type: HydroEdgeType,
469 }
470
471 fn build_simple_transform(params: TransformParams) -> usize {
473 let input_id = params.input.build_graph_structure(
474 params.structure,
475 params.seen_tees,
476 params.config,
477 );
478 let location_id = setup_location(params.structure, params.metadata);
479 let node_id = params.structure.add_node(
480 NodeLabel::Static(params.op_name.to_string()),
481 params.node_type,
482 location_id,
483 );
484 params
485 .structure
486 .add_edge(input_id, node_id, params.edge_type, None);
487 node_id
488 }
489
490 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
492 let input_id = params.input.build_graph_structure(
493 params.structure,
494 params.seen_tees,
495 params.config,
496 );
497 let location_id = setup_location(params.structure, params.metadata);
498 let node_id = params.structure.add_node(
499 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
500 params.node_type,
501 location_id,
502 );
503 params
504 .structure
505 .add_edge(input_id, node_id, params.edge_type, None);
506 node_id
507 }
508
509 fn build_dual_expr_transform(
511 params: TransformParams,
512 expr1: &DebugExpr,
513 expr2: &DebugExpr,
514 ) -> usize {
515 let input_id = params.input.build_graph_structure(
516 params.structure,
517 params.seen_tees,
518 params.config,
519 );
520 let location_id = setup_location(params.structure, params.metadata);
521 let node_id = params.structure.add_node(
522 NodeLabel::with_exprs(
523 params.op_name.to_string(),
524 vec![expr1.clone(), expr2.clone()],
525 ),
526 params.node_type,
527 location_id,
528 );
529 params
530 .structure
531 .add_edge(input_id, node_id, params.edge_type, None);
532 node_id
533 }
534
535 fn build_source_node(
537 structure: &mut HydroGraphStructure,
538 metadata: &crate::compile::ir::HydroIrMetadata,
539 label: String,
540 ) -> usize {
541 let location_id = setup_location(structure, metadata);
542 structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
543 }
544
545 match self {
546 HydroNode::Placeholder => structure.add_node(
547 NodeLabel::Static("PLACEHOLDER".to_string()),
548 HydroNodeType::Transform,
549 None,
550 ),
551
552 HydroNode::Source {
553 source, metadata, ..
554 } => {
555 let label = match source {
556 HydroSource::Stream(expr) => format!("source_stream({})", expr),
557 HydroSource::ExternalNetwork() => "external_network()".to_string(),
558 HydroSource::Iter(expr) => format!("source_iter({})", expr),
559 HydroSource::Spin() => "spin()".to_string(),
560 };
561 build_source_node(structure, metadata, label)
562 }
563
564 HydroNode::ExternalInput {
565 from_external_id,
566 from_key,
567 metadata,
568 ..
569 } => build_source_node(
570 structure,
571 metadata,
572 format!("external_input({}:{})", from_external_id, from_key),
573 ),
574
575 HydroNode::CycleSource {
576 ident, metadata, ..
577 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
578
579 HydroNode::Tee { inner, metadata } => {
580 let ptr = inner.as_ptr();
581 if let Some(&existing_id) = seen_tees.get(&ptr) {
582 return existing_id;
583 }
584
585 let input_id = inner
586 .0
587 .borrow()
588 .build_graph_structure(structure, seen_tees, config);
589 let location_id = setup_location(structure, metadata);
590
591 let tee_id = structure.add_node(
592 NodeLabel::Static(extract_op_name(self.print_root())),
593 HydroNodeType::Tee,
594 location_id,
595 );
596
597 seen_tees.insert(ptr, tee_id);
598
599 structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
600
601 tee_id
602 }
603
604 HydroNode::Cast { inner, metadata }
606 | HydroNode::ObserveNonDet { inner, metadata }
607 | HydroNode::DeferTick {
608 input: inner,
609 metadata,
610 }
611 | HydroNode::Enumerate {
612 input: inner,
613 metadata,
614 ..
615 }
616 | HydroNode::Unique {
617 input: inner,
618 metadata,
619 }
620 | HydroNode::ResolveFutures {
621 input: inner,
622 metadata,
623 }
624 | HydroNode::ResolveFuturesOrdered {
625 input: inner,
626 metadata,
627 } => build_simple_transform(TransformParams {
628 structure,
629 seen_tees,
630 config,
631 input: inner,
632 metadata,
633 op_name: extract_op_name(self.print_root()),
634 node_type: HydroNodeType::Transform,
635 edge_type: HydroEdgeType::Stream,
636 }),
637
638 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
640 structure,
641 seen_tees,
642 config,
643 input: inner,
644 metadata,
645 op_name: extract_op_name(self.print_root()),
646 node_type: HydroNodeType::Transform,
647 edge_type: HydroEdgeType::Persistent,
648 }),
649
650 HydroNode::Sort {
652 input: inner,
653 metadata,
654 } => build_simple_transform(TransformParams {
655 structure,
656 seen_tees,
657 config,
658 input: inner,
659 metadata,
660 op_name: extract_op_name(self.print_root()),
661 node_type: HydroNodeType::Aggregation,
662 edge_type: HydroEdgeType::Stream,
663 }),
664
665 HydroNode::Map { f, input, metadata }
667 | HydroNode::Filter { f, input, metadata }
668 | HydroNode::FlatMap { f, input, metadata }
669 | HydroNode::FilterMap { f, input, metadata }
670 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
671 TransformParams {
672 structure,
673 seen_tees,
674 config,
675 input,
676 metadata,
677 op_name: extract_op_name(self.print_root()),
678 node_type: HydroNodeType::Transform,
679 edge_type: HydroEdgeType::Stream,
680 },
681 f,
682 ),
683
684 HydroNode::Reduce { f, input, metadata }
686 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
687 TransformParams {
688 structure,
689 seen_tees,
690 config,
691 input,
692 metadata,
693 op_name: extract_op_name(self.print_root()),
694 node_type: HydroNodeType::Aggregation,
695 edge_type: HydroEdgeType::Stream,
696 },
697 f,
698 ),
699
700 HydroNode::Join {
702 left,
703 right,
704 metadata,
705 }
706 | HydroNode::CrossProduct {
707 left,
708 right,
709 metadata,
710 }
711 | HydroNode::CrossSingleton {
712 left,
713 right,
714 metadata,
715 } => {
716 let left_id = left.build_graph_structure(structure, seen_tees, config);
717 let right_id = right.build_graph_structure(structure, seen_tees, config);
718 let location_id = setup_location(structure, metadata);
719 let node_id = structure.add_node(
720 NodeLabel::Static(extract_op_name(self.print_root())),
721 HydroNodeType::Join,
722 location_id,
723 );
724 structure.add_edge(
725 left_id,
726 node_id,
727 HydroEdgeType::Stream,
728 Some("left".to_string()),
729 );
730 structure.add_edge(
731 right_id,
732 node_id,
733 HydroEdgeType::Stream,
734 Some("right".to_string()),
735 );
736 node_id
737 }
738
739 HydroNode::Difference {
741 pos: left,
742 neg: right,
743 metadata,
744 }
745 | HydroNode::AntiJoin {
746 pos: left,
747 neg: right,
748 metadata,
749 } => {
750 let left_id = left.build_graph_structure(structure, seen_tees, config);
751 let right_id = right.build_graph_structure(structure, seen_tees, config);
752 let location_id = setup_location(structure, metadata);
753 let node_id = structure.add_node(
754 NodeLabel::Static(extract_op_name(self.print_root())),
755 HydroNodeType::Join,
756 location_id,
757 );
758 structure.add_edge(
759 left_id,
760 node_id,
761 HydroEdgeType::Stream,
762 Some("pos".to_string()),
763 );
764 structure.add_edge(
765 right_id,
766 node_id,
767 HydroEdgeType::Stream,
768 Some("neg".to_string()),
769 );
770 node_id
771 }
772
773 HydroNode::Fold {
775 init,
776 acc,
777 input,
778 metadata,
779 }
780 | HydroNode::FoldKeyed {
781 init,
782 acc,
783 input,
784 metadata,
785 }
786 | HydroNode::Scan {
787 init,
788 acc,
789 input,
790 metadata,
791 } => {
792 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
795 TransformParams {
796 structure,
797 seen_tees,
798 config,
799 input,
800 metadata,
801 op_name: extract_op_name(self.print_root()),
802 node_type,
803 edge_type: HydroEdgeType::Stream,
804 },
805 init,
806 acc,
807 )
808 }
809
810 HydroNode::ReduceKeyedWatermark {
812 f,
813 input,
814 watermark,
815 metadata,
816 } => {
817 let input_id = input.build_graph_structure(structure, seen_tees, config);
818 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
819 let location_id = setup_location(structure, metadata);
820 let join_node_id = structure.add_node(
821 NodeLabel::Static(extract_op_name(self.print_root())),
822 HydroNodeType::Join,
823 location_id,
824 );
825 structure.add_edge(
826 input_id,
827 join_node_id,
828 HydroEdgeType::Stream,
829 Some("input".to_string()),
830 );
831 structure.add_edge(
832 watermark_id,
833 join_node_id,
834 HydroEdgeType::Stream,
835 Some("watermark".to_string()),
836 );
837
838 let node_id = structure.add_node(
839 NodeLabel::with_exprs(
840 extract_op_name(self.print_root()).to_string(),
841 vec![f.clone()],
842 ),
843 HydroNodeType::Aggregation,
844 location_id,
845 );
846 structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
847 node_id
848 }
849
850 HydroNode::Network {
851 serialize_fn,
852 deserialize_fn,
853 input,
854 metadata,
855 ..
856 } => {
857 let input_id = input.build_graph_structure(structure, seen_tees, config);
858 let _from_location_id = setup_location(structure, metadata);
859
860 let to_location_id = match metadata.location_kind.root() {
861 LocationId::Process(id) => {
862 structure.add_location(*id, "Process".to_string());
863 Some(*id)
864 }
865 LocationId::Cluster(id) => {
866 structure.add_location(*id, "Cluster".to_string());
867 Some(*id)
868 }
869 _ => None,
870 };
871
872 let mut label = "network(".to_string();
873 if serialize_fn.is_some() {
874 label.push_str("ser");
875 }
876 if deserialize_fn.is_some() {
877 if serialize_fn.is_some() {
878 label.push_str(" + ");
879 }
880 label.push_str("deser");
881 }
882 label.push(')');
883
884 let network_id = structure.add_node(
885 NodeLabel::Static(label),
886 HydroNodeType::Network,
887 to_location_id,
888 );
889 structure.add_edge(
890 input_id,
891 network_id,
892 HydroEdgeType::Network,
893 Some(format!("to {:?}", to_location_id)),
894 );
895 network_id
896 }
897
898 HydroNode::Batch { inner, .. } => {
900 inner.build_graph_structure(structure, seen_tees, config)
902 }
903
904 HydroNode::YieldConcat { inner, .. } => {
905 inner.build_graph_structure(structure, seen_tees, config)
907 }
908
909 HydroNode::BeginAtomic { inner, .. } => {
910 inner.build_graph_structure(structure, seen_tees, config)
911 }
912
913 HydroNode::EndAtomic { inner, .. } => {
914 inner.build_graph_structure(structure, seen_tees, config)
915 }
916
917 HydroNode::Chain {
918 first,
919 second,
920 metadata,
921 } => {
922 let first_id = first.build_graph_structure(structure, seen_tees, config);
923 let second_id = second.build_graph_structure(structure, seen_tees, config);
924 let location_id = setup_location(structure, metadata);
925 let chain_id = structure.add_node(
926 NodeLabel::Static(extract_op_name(self.print_root())),
927 HydroNodeType::Transform,
928 location_id,
929 );
930 structure.add_edge(
931 first_id,
932 chain_id,
933 HydroEdgeType::Stream,
934 Some("first".to_string()),
935 );
936 structure.add_edge(
937 second_id,
938 chain_id,
939 HydroEdgeType::Stream,
940 Some("second".to_string()),
941 );
942 chain_id
943 }
944
945 HydroNode::ChainFirst {
946 first,
947 second,
948 metadata,
949 } => {
950 let first_id = first.build_graph_structure(structure, seen_tees, config);
951 let second_id = second.build_graph_structure(structure, seen_tees, config);
952 let location_id = setup_location(structure, metadata);
953 let chain_id = structure.add_node(
954 NodeLabel::Static(extract_op_name(self.print_root())),
955 HydroNodeType::Transform,
956 location_id,
957 );
958 structure.add_edge(
959 first_id,
960 chain_id,
961 HydroEdgeType::Stream,
962 Some("first".to_string()),
963 );
964 structure.add_edge(
965 second_id,
966 chain_id,
967 HydroEdgeType::Stream,
968 Some("second".to_string()),
969 );
970 chain_id
971 }
972
973 HydroNode::Counter {
974 tag: _,
975 prefix: _,
976 duration,
977 input,
978 metadata,
979 } => build_single_expr_transform(
980 TransformParams {
981 structure,
982 seen_tees,
983 config,
984 input,
985 metadata,
986 op_name: extract_op_name(self.print_root()),
987 node_type: HydroNodeType::Transform,
988 edge_type: HydroEdgeType::Stream,
989 },
990 duration,
991 ),
992 }
993 }
994}
995
996macro_rules! render_hydro_ir {
999 ($name:ident, $write_fn:ident) => {
1000 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1001 let mut output = String::new();
1002 $write_fn(&mut output, roots, config).unwrap();
1003 output
1004 }
1005 };
1006}
1007
1008macro_rules! write_hydro_ir {
1010 ($name:ident, $writer_type:ty, $constructor:expr) => {
1011 pub fn $name(
1012 output: impl std::fmt::Write,
1013 roots: &[HydroRoot],
1014 config: &HydroWriteConfig,
1015 ) -> std::fmt::Result {
1016 let mut graph_write: $writer_type = $constructor(output, config);
1017 write_hydro_ir_graph(&mut graph_write, roots, config)
1018 }
1019 };
1020}
1021
1022render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1023write_hydro_ir!(
1024 write_hydro_ir_mermaid,
1025 HydroMermaid<_>,
1026 HydroMermaid::new_with_config
1027);
1028
1029render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1030write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1031
1032render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1033write_hydro_ir!(
1034 write_hydro_ir_reactflow,
1035 HydroReactFlow<_>,
1036 HydroReactFlow::new
1037);
1038
1039fn write_hydro_ir_graph<W>(
1040 mut graph_write: W,
1041 roots: &[HydroRoot],
1042 config: &HydroWriteConfig,
1043) -> Result<(), W::Err>
1044where
1045 W: HydroGraphWrite,
1046{
1047 let mut structure = HydroGraphStructure::new();
1048 let mut seen_tees = HashMap::new();
1049
1050 for leaf in roots {
1052 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1053 }
1054
1055 graph_write.write_prologue()?;
1057
1058 for (&node_id, (label, node_type, location)) in &structure.nodes {
1059 let (location_id, location_type) = if let Some(loc_id) = location {
1060 (
1061 Some(*loc_id),
1062 structure.locations.get(loc_id).map(|s| s.as_str()),
1063 )
1064 } else {
1065 (None, None)
1066 };
1067 graph_write.write_node_definition(
1068 node_id,
1069 label,
1070 *node_type,
1071 location_id,
1072 location_type,
1073 )?;
1074 }
1075
1076 if config.show_location_groups {
1077 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1078 for (&node_id, (_, _, location)) in &structure.nodes {
1079 if let Some(location_id) = location {
1080 nodes_by_location
1081 .entry(*location_id)
1082 .or_default()
1083 .push(node_id);
1084 }
1085 }
1086
1087 for (&location_id, node_ids) in &nodes_by_location {
1088 if let Some(location_type) = structure.locations.get(&location_id) {
1089 graph_write.write_location_start(location_id, location_type)?;
1090 for &node_id in node_ids {
1091 graph_write.write_node(node_id)?;
1092 }
1093 graph_write.write_location_end()?;
1094 }
1095 }
1096 }
1097
1098 for (src_id, dst_id, edge_type, label) in &structure.edges {
1099 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1100 }
1101
1102 graph_write.write_epilogue()?;
1103 Ok(())
1104}