1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::str::FromStr;
6
7use auto_impl::auto_impl;
8use slotmap::{KeyData, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17
18#[derive(Debug, Clone)]
20pub enum NodeLabel {
21 Static(String),
23 WithExprs {
25 op_name: String,
26 exprs: Vec<DebugExpr>,
27 },
28}
29
30impl NodeLabel {
31 pub fn static_label(s: String) -> Self {
33 Self::Static(s)
34 }
35
36 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
38 Self::WithExprs { op_name, exprs }
39 }
40}
41
42impl Display for NodeLabel {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 Self::Static(s) => write!(f, "{}", s),
46 Self::WithExprs { op_name, exprs } => {
47 if exprs.is_empty() {
48 write!(f, "{}()", op_name)
49 } else {
50 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
51 write!(f, "{}({})", op_name, expr_strs.join(", "))
52 }
53 }
54 }
55 }
56}
57
58pub struct IndentedGraphWriter<W> {
61 pub write: W,
62 pub indent: usize,
63 pub config: HydroWriteConfig,
64}
65
66impl<W> IndentedGraphWriter<W> {
67 pub fn new(write: W) -> Self {
69 Self {
70 write,
71 indent: 0,
72 config: HydroWriteConfig::default(),
73 }
74 }
75
76 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
78 Self {
79 write,
80 indent: 0,
81 config: config.clone(),
82 }
83 }
84}
85
86impl<W: Write> IndentedGraphWriter<W> {
87 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
89 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
90 }
91}
92
93pub type GraphWriteError = std::fmt::Error;
95
96#[auto_impl(&mut, Box)]
98pub trait HydroGraphWrite {
99 type Err: Error;
101
102 fn write_prologue(&mut self) -> Result<(), Self::Err>;
104
105 fn write_node_definition(
107 &mut self,
108 node_id: VizNodeKey,
109 node_label: &NodeLabel,
110 node_type: HydroNodeType,
111 location_id: Option<usize>,
112 location_type: Option<&str>,
113 backtrace: Option<&Backtrace>,
114 ) -> Result<(), Self::Err>;
115
116 fn write_edge(
118 &mut self,
119 src_id: VizNodeKey,
120 dst_id: VizNodeKey,
121 edge_properties: &HashSet<HydroEdgeProp>,
122 label: Option<&str>,
123 ) -> Result<(), Self::Err>;
124
125 fn write_location_start(
127 &mut self,
128 location_id: usize,
129 location_type: &str,
130 ) -> Result<(), Self::Err>;
131
132 fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
134
135 fn write_location_end(&mut self) -> Result<(), Self::Err>;
137
138 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
140}
141
142pub mod node_type_utils {
144 use super::HydroNodeType;
145
146 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
148 (HydroNodeType::Source, "Source"),
149 (HydroNodeType::Transform, "Transform"),
150 (HydroNodeType::Join, "Join"),
151 (HydroNodeType::Aggregation, "Aggregation"),
152 (HydroNodeType::Network, "Network"),
153 (HydroNodeType::Sink, "Sink"),
154 (HydroNodeType::Tee, "Tee"),
155 (HydroNodeType::NonDeterministic, "NonDeterministic"),
156 ];
157
158 pub fn to_string(node_type: HydroNodeType) -> &'static str {
160 NODE_TYPE_DATA
161 .iter()
162 .find(|(nt, _)| *nt == node_type)
163 .map(|(_, name)| *name)
164 .unwrap_or("Unknown")
165 }
166
167 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
169 NODE_TYPE_DATA.to_vec()
170 }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum HydroNodeType {
176 Source,
177 Transform,
178 Join,
179 Aggregation,
180 Network,
181 Sink,
182 Tee,
183 NonDeterministic,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
188pub enum HydroEdgeProp {
189 Bounded,
190 Unbounded,
191 TotalOrder,
192 NoOrder,
193 Keyed,
194 Stream,
196 KeyedSingleton,
197 KeyedStream,
198 Singleton,
199 Optional,
200 Network,
201 Cycle,
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct UnifiedEdgeStyle {
208 pub line_pattern: LinePattern,
210 pub line_width: u8,
212 pub arrowhead: ArrowheadStyle,
214 pub line_style: LineStyle,
216 pub halo: HaloStyle,
218 pub waviness: WavinessStyle,
220 pub animation: AnimationStyle,
222 pub color: &'static str,
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum LinePattern {
228 Solid,
229 Dotted,
230 Dashed,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ArrowheadStyle {
235 TriangleFilled,
236 CircleFilled,
237 DiamondOpen,
238 Default,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub enum LineStyle {
243 Single,
245 HashMarks,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub enum HaloStyle {
251 None,
252 LightBlue,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum WavinessStyle {
257 None,
258 Wavy,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum AnimationStyle {
263 Static,
264 Animated,
265}
266
267impl Default for UnifiedEdgeStyle {
268 fn default() -> Self {
269 Self {
270 line_pattern: LinePattern::Solid,
271 line_width: 1,
272 arrowhead: ArrowheadStyle::Default,
273 line_style: LineStyle::Single,
274 halo: HaloStyle::None,
275 waviness: WavinessStyle::None,
276 animation: AnimationStyle::Static,
277 color: "#666666",
278 }
279 }
280}
281
282pub fn get_unified_edge_style(
295 edge_properties: &HashSet<HydroEdgeProp>,
296 src_location: Option<usize>,
297 dst_location: Option<usize>,
298) -> UnifiedEdgeStyle {
299 let mut style = UnifiedEdgeStyle::default();
300
301 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
303 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
304
305 if is_network {
306 style.line_pattern = LinePattern::Dashed;
307 style.animation = AnimationStyle::Animated;
308 } else {
309 style.line_pattern = LinePattern::Solid;
310 style.animation = AnimationStyle::Static;
311 }
312
313 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
315 style.halo = HaloStyle::LightBlue;
316 } else {
317 style.halo = HaloStyle::None;
318 }
319
320 if edge_properties.contains(&HydroEdgeProp::Stream) {
322 style.arrowhead = ArrowheadStyle::TriangleFilled;
323 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
325 style.arrowhead = ArrowheadStyle::TriangleFilled;
326 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
328 style.arrowhead = ArrowheadStyle::TriangleFilled;
329 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
331 style.arrowhead = ArrowheadStyle::CircleFilled;
332 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
334 style.arrowhead = ArrowheadStyle::DiamondOpen;
335 style.color = "#6b7280"; }
337
338 if edge_properties.contains(&HydroEdgeProp::Keyed) {
340 style.line_style = LineStyle::HashMarks; } else {
342 style.line_style = LineStyle::Single;
343 }
344
345 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
347 style.waviness = WavinessStyle::Wavy;
348 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
349 style.waviness = WavinessStyle::None;
350 }
351
352 style
353}
354
355pub fn extract_edge_properties_from_collection_kind(
359 collection_kind: &crate::compile::ir::CollectionKind,
360) -> HashSet<HydroEdgeProp> {
361 use crate::compile::ir::CollectionKind;
362
363 let mut properties = HashSet::new();
364
365 match collection_kind {
366 CollectionKind::Stream { bound, order, .. } => {
367 properties.insert(HydroEdgeProp::Stream);
368 add_bound_property(&mut properties, bound);
369 add_order_property(&mut properties, order);
370 }
371 CollectionKind::KeyedStream {
372 bound, value_order, ..
373 } => {
374 properties.insert(HydroEdgeProp::KeyedStream);
375 properties.insert(HydroEdgeProp::Keyed);
376 add_bound_property(&mut properties, bound);
377 add_order_property(&mut properties, value_order);
378 }
379 CollectionKind::Singleton { bound, .. } => {
380 properties.insert(HydroEdgeProp::Singleton);
381 add_bound_property(&mut properties, bound);
382 properties.insert(HydroEdgeProp::TotalOrder);
384 }
385 CollectionKind::Optional { bound, .. } => {
386 properties.insert(HydroEdgeProp::Optional);
387 add_bound_property(&mut properties, bound);
388 properties.insert(HydroEdgeProp::TotalOrder);
390 }
391 CollectionKind::KeyedSingleton { bound, .. } => {
392 properties.insert(HydroEdgeProp::Singleton);
393 properties.insert(HydroEdgeProp::Keyed);
394 add_keyed_singleton_bound_property(&mut properties, bound);
396 properties.insert(HydroEdgeProp::TotalOrder);
397 }
398 }
399
400 properties
401}
402
403fn add_bound_property(
405 properties: &mut HashSet<HydroEdgeProp>,
406 bound: &crate::compile::ir::BoundKind,
407) {
408 use crate::compile::ir::BoundKind;
409
410 match bound {
411 BoundKind::Bounded => {
412 properties.insert(HydroEdgeProp::Bounded);
413 }
414 BoundKind::Unbounded => {
415 properties.insert(HydroEdgeProp::Unbounded);
416 }
417 }
418}
419
420fn add_keyed_singleton_bound_property(
422 properties: &mut HashSet<HydroEdgeProp>,
423 bound: &crate::compile::ir::KeyedSingletonBoundKind,
424) {
425 use crate::compile::ir::KeyedSingletonBoundKind;
426
427 match bound {
428 KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
429 properties.insert(HydroEdgeProp::Bounded);
430 }
431 KeyedSingletonBoundKind::Unbounded => {
432 properties.insert(HydroEdgeProp::Unbounded);
433 }
434 }
435}
436
437fn add_order_property(
439 properties: &mut HashSet<HydroEdgeProp>,
440 order: &crate::compile::ir::StreamOrder,
441) {
442 use crate::compile::ir::StreamOrder;
443
444 match order {
445 StreamOrder::TotalOrder => {
446 properties.insert(HydroEdgeProp::TotalOrder);
447 }
448 StreamOrder::NoOrder => {
449 properties.insert(HydroEdgeProp::NoOrder);
450 }
451 }
452}
453
454pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
457 src_location.root() != dst_location.root()
459}
460
461pub fn add_network_edge_tag(
463 properties: &mut HashSet<HydroEdgeProp>,
464 src_location: &LocationId,
465 dst_location: &LocationId,
466) {
467 if is_network_edge(src_location, dst_location) {
468 properties.insert(HydroEdgeProp::Network);
469 }
470}
471
472#[derive(Debug, Clone)]
474pub struct HydroWriteConfig {
475 pub show_metadata: bool,
476 pub show_location_groups: bool,
477 pub use_short_labels: bool,
478 pub process_id_name: Vec<(usize, String)>,
479 pub cluster_id_name: Vec<(usize, String)>,
480 pub external_id_name: Vec<(usize, String)>,
481}
482
483impl Default for HydroWriteConfig {
484 fn default() -> Self {
485 Self {
486 show_metadata: false,
487 show_location_groups: true,
488 use_short_labels: true, process_id_name: vec![],
490 cluster_id_name: vec![],
491 external_id_name: vec![],
492 }
493 }
494}
495
496#[derive(Clone)]
498pub struct HydroGraphNode {
499 pub label: NodeLabel,
500 pub node_type: HydroNodeType,
501 pub location: Option<usize>,
502 pub backtrace: Option<Backtrace>,
503}
504
505slotmap::new_key_type! {
506 pub struct VizNodeKey;
510}
511
512impl Display for VizNodeKey {
513 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514 write!(f, "{:?}", self) }
516}
517
518impl FromStr for VizNodeKey {
519 type Err = Option<ParseIntError>;
520
521 fn from_str(s: &str) -> Result<Self, Self::Err> {
522 let s = s.strip_prefix("VizNodeKey(").ok_or(None)?;
524 let s = s.strip_suffix(")").ok_or(None)?;
525 let (idx, version) = s.split_once("v").ok_or(None)?;
526 let idx: u64 = idx.parse()?;
527 let version: u64 = version.parse()?;
528 let key_data = KeyData::from_ffi((version << 32) | idx);
529 Ok(Self::from(key_data))
530 }
531}
532
533#[derive(Debug, Clone)]
535pub struct HydroGraphEdge {
536 pub src: VizNodeKey,
537 pub dst: VizNodeKey,
538 pub edge_properties: HashSet<HydroEdgeProp>,
539 pub label: Option<String>,
540}
541
542#[derive(Default)]
544pub struct HydroGraphStructure {
545 pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
546 pub edges: Vec<HydroGraphEdge>,
547 pub locations: HashMap<usize, String>, }
549
550impl HydroGraphStructure {
551 pub fn new() -> Self {
552 Self::default()
553 }
554
555 pub fn add_node(
556 &mut self,
557 label: NodeLabel,
558 node_type: HydroNodeType,
559 location: Option<usize>,
560 ) -> VizNodeKey {
561 self.add_node_with_backtrace(label, node_type, location, None)
562 }
563
564 pub fn add_node_with_backtrace(
565 &mut self,
566 label: NodeLabel,
567 node_type: HydroNodeType,
568 location: Option<usize>,
569 backtrace: Option<Backtrace>,
570 ) -> VizNodeKey {
571 self.nodes.insert(HydroGraphNode {
572 label,
573 node_type,
574 location,
575 backtrace,
576 })
577 }
578
579 pub fn add_node_with_metadata(
581 &mut self,
582 label: NodeLabel,
583 node_type: HydroNodeType,
584 metadata: &HydroIrMetadata,
585 ) -> VizNodeKey {
586 let location = setup_location(self, metadata);
587 let backtrace = Some(metadata.op.backtrace.clone());
588 self.add_node_with_backtrace(label, node_type, location, backtrace)
589 }
590
591 pub fn add_edge(
592 &mut self,
593 src: VizNodeKey,
594 dst: VizNodeKey,
595 edge_properties: HashSet<HydroEdgeProp>,
596 label: Option<String>,
597 ) {
598 self.edges.push(HydroGraphEdge {
599 src,
600 dst,
601 edge_properties,
602 label,
603 });
604 }
605
606 pub fn add_edge_single(
608 &mut self,
609 src: VizNodeKey,
610 dst: VizNodeKey,
611 edge_type: HydroEdgeProp,
612 label: Option<String>,
613 ) {
614 let mut properties = HashSet::new();
615 properties.insert(edge_type);
616 self.edges.push(HydroGraphEdge {
617 src,
618 dst,
619 edge_properties: properties,
620 label,
621 });
622 }
623
624 pub fn add_location(&mut self, location_id: usize, location_type: String) {
625 self.locations.insert(location_id, location_type);
626 }
627}
628
629pub fn extract_op_name(full_label: String) -> String {
631 full_label
632 .split('(')
633 .next()
634 .unwrap_or("unknown")
635 .to_string()
636 .to_lowercase()
637}
638
639pub fn extract_short_label(full_label: &str) -> String {
641 if let Some(op_name) = full_label.split('(').next() {
643 let base_name = op_name.to_lowercase();
644 match base_name.as_str() {
645 "source" => {
647 if full_label.contains("Iter") {
648 "source_iter".to_string()
649 } else if full_label.contains("Stream") {
650 "source_stream".to_string()
651 } else if full_label.contains("ExternalNetwork") {
652 "external_network".to_string()
653 } else if full_label.contains("Spin") {
654 "spin".to_string()
655 } else {
656 "source".to_string()
657 }
658 }
659 "network" => {
660 if full_label.contains("deser") {
661 "network(recv)".to_string()
662 } else if full_label.contains("ser") {
663 "network(send)".to_string()
664 } else {
665 "network".to_string()
666 }
667 }
668 _ => base_name,
670 }
671 } else {
672 if full_label.len() > 20 {
674 format!("{}...", &full_label[..17])
675 } else {
676 full_label.to_string()
677 }
678 }
679}
680
681fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
683 match location_id.root() {
684 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
685 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
686 _ => panic!("unexpected location type"),
687 }
688}
689
690fn setup_location(
692 structure: &mut HydroGraphStructure,
693 metadata: &HydroIrMetadata,
694) -> Option<usize> {
695 let (location_id, location_type) = extract_location_id(&metadata.location_kind);
696 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
697 structure.add_location(loc_id, loc_type);
698 }
699 location_id
700}
701
702fn add_edge_with_metadata(
705 structure: &mut HydroGraphStructure,
706 src_id: VizNodeKey,
707 dst_id: VizNodeKey,
708 src_metadata: Option<&HydroIrMetadata>,
709 dst_metadata: Option<&HydroIrMetadata>,
710 label: Option<String>,
711) {
712 let mut properties = HashSet::new();
713
714 if let Some(metadata) = src_metadata {
716 properties.extend(extract_edge_properties_from_collection_kind(
717 &metadata.collection_kind,
718 ));
719 }
720
721 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
723 add_network_edge_tag(
724 &mut properties,
725 &src_meta.location_kind,
726 &dst_meta.location_kind,
727 );
728 }
729
730 if properties.is_empty() {
732 properties.insert(HydroEdgeProp::Stream);
733 }
734
735 structure.add_edge(src_id, dst_id, properties, label);
736}
737
738fn write_graph_structure<W>(
740 structure: &HydroGraphStructure,
741 graph_write: W,
742 config: &HydroWriteConfig,
743) -> Result<(), W::Err>
744where
745 W: HydroGraphWrite,
746{
747 let mut graph_write = graph_write;
748 graph_write.write_prologue()?;
750
751 for (node_id, node) in structure.nodes.iter() {
753 let (location_id, location_type) = if let Some(loc_id) = node.location {
754 (
755 Some(loc_id),
756 structure.locations.get(&loc_id).map(|s| s.as_str()),
757 )
758 } else {
759 (None, None)
760 };
761
762 graph_write.write_node_definition(
763 node_id,
764 &node.label,
765 node.node_type,
766 location_id,
767 location_type,
768 node.backtrace.as_ref(),
769 )?;
770 }
771
772 if config.show_location_groups {
774 let mut nodes_by_location: HashMap<usize, Vec<VizNodeKey>> = HashMap::new();
775 for (node_id, node) in structure.nodes.iter() {
776 if let Some(location_id) = node.location {
777 nodes_by_location
778 .entry(location_id)
779 .or_default()
780 .push(node_id);
781 }
782 }
783
784 for (&location_id, node_ids) in &nodes_by_location {
785 if let Some(location_type) = structure.locations.get(&location_id) {
786 graph_write.write_location_start(location_id, location_type)?;
787 for &node_id in node_ids {
788 graph_write.write_node(node_id)?;
789 }
790 graph_write.write_location_end()?;
791 }
792 }
793 }
794
795 for edge in &structure.edges {
797 graph_write.write_edge(
798 edge.src,
799 edge.dst,
800 &edge.edge_properties,
801 edge.label.as_deref(),
802 )?;
803 }
804
805 graph_write.write_epilogue()?;
806 Ok(())
807}
808
809impl HydroRoot {
810 pub fn build_graph_structure(
812 &self,
813 structure: &mut HydroGraphStructure,
814 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
815 config: &HydroWriteConfig,
816 ) -> VizNodeKey {
817 fn build_sink_node(
819 structure: &mut HydroGraphStructure,
820 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
821 config: &HydroWriteConfig,
822 input: &HydroNode,
823 sink_metadata: Option<&HydroIrMetadata>,
824 label: NodeLabel,
825 ) -> VizNodeKey {
826 let input_id = input.build_graph_structure(structure, seen_tees, config);
827
828 let effective_metadata = if let Some(meta) = sink_metadata {
830 Some(meta)
831 } else {
832 match input {
833 HydroNode::Placeholder => None,
834 _ => Some(input.metadata()),
836 }
837 };
838
839 let location_id = effective_metadata.and_then(|m| setup_location(structure, m));
840 let sink_id = structure.add_node_with_backtrace(
841 label,
842 HydroNodeType::Sink,
843 location_id,
844 effective_metadata.map(|m| m.op.backtrace.clone()),
845 );
846
847 let input_metadata = input.metadata();
849 add_edge_with_metadata(
850 structure,
851 input_id,
852 sink_id,
853 Some(input_metadata),
854 sink_metadata,
855 None,
856 );
857
858 sink_id
859 }
860
861 match self {
862 HydroRoot::ForEach { f, input, .. } => build_sink_node(
864 structure,
865 seen_tees,
866 config,
867 input,
868 None,
869 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
870 ),
871
872 HydroRoot::SendExternal {
873 to_external_id,
874 to_port_id,
875 input,
876 ..
877 } => build_sink_node(
878 structure,
879 seen_tees,
880 config,
881 input,
882 None,
883 NodeLabel::with_exprs(
884 format!("send_external({}:{})", to_external_id, to_port_id),
885 vec![],
886 ),
887 ),
888
889 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
890 structure,
891 seen_tees,
892 config,
893 input,
894 None,
895 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
896 ),
897
898 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
899 structure,
900 seen_tees,
901 config,
902 input,
903 None,
904 NodeLabel::static_label(format!("cycle_sink({})", ident)),
905 ),
906 }
907 }
908}
909
910impl HydroNode {
911 pub fn build_graph_structure(
913 &self,
914 structure: &mut HydroGraphStructure,
915 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
916 config: &HydroWriteConfig,
917 ) -> VizNodeKey {
918 use crate::location::dynamic::LocationId;
919
920 struct TransformParams<'a> {
924 structure: &'a mut HydroGraphStructure,
925 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
926 config: &'a HydroWriteConfig,
927 input: &'a HydroNode,
928 metadata: &'a HydroIrMetadata,
929 op_name: String,
930 node_type: HydroNodeType,
931 }
932
933 fn build_simple_transform(params: TransformParams) -> VizNodeKey {
935 let input_id = params.input.build_graph_structure(
936 params.structure,
937 params.seen_tees,
938 params.config,
939 );
940 let node_id = params.structure.add_node_with_metadata(
941 NodeLabel::Static(params.op_name.to_string()),
942 params.node_type,
943 params.metadata,
944 );
945
946 let input_metadata = params.input.metadata();
948 add_edge_with_metadata(
949 params.structure,
950 input_id,
951 node_id,
952 Some(input_metadata),
953 Some(params.metadata),
954 None,
955 );
956
957 node_id
958 }
959
960 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
962 let input_id = params.input.build_graph_structure(
963 params.structure,
964 params.seen_tees,
965 params.config,
966 );
967 let node_id = params.structure.add_node_with_metadata(
968 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
969 params.node_type,
970 params.metadata,
971 );
972
973 let input_metadata = params.input.metadata();
975 add_edge_with_metadata(
976 params.structure,
977 input_id,
978 node_id,
979 Some(input_metadata),
980 Some(params.metadata),
981 None,
982 );
983
984 node_id
985 }
986
987 fn build_dual_expr_transform(
989 params: TransformParams,
990 expr1: &DebugExpr,
991 expr2: &DebugExpr,
992 ) -> VizNodeKey {
993 let input_id = params.input.build_graph_structure(
994 params.structure,
995 params.seen_tees,
996 params.config,
997 );
998 let node_id = params.structure.add_node_with_metadata(
999 NodeLabel::with_exprs(
1000 params.op_name.to_string(),
1001 vec![expr1.clone(), expr2.clone()],
1002 ),
1003 params.node_type,
1004 params.metadata,
1005 );
1006
1007 let input_metadata = params.input.metadata();
1009 add_edge_with_metadata(
1010 params.structure,
1011 input_id,
1012 node_id,
1013 Some(input_metadata),
1014 Some(params.metadata),
1015 None,
1016 );
1017
1018 node_id
1019 }
1020
1021 fn build_source_node(
1023 structure: &mut HydroGraphStructure,
1024 metadata: &HydroIrMetadata,
1025 label: String,
1026 ) -> VizNodeKey {
1027 structure.add_node_with_metadata(
1028 NodeLabel::Static(label),
1029 HydroNodeType::Source,
1030 metadata,
1031 )
1032 }
1033
1034 match self {
1035 HydroNode::Placeholder => structure.add_node(
1036 NodeLabel::Static("PLACEHOLDER".to_string()),
1037 HydroNodeType::Transform,
1038 None,
1039 ),
1040
1041 HydroNode::Source {
1042 source, metadata, ..
1043 } => {
1044 let label = match source {
1045 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1046 HydroSource::ExternalNetwork() => "external_network()".to_string(),
1047 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1048 HydroSource::Spin() => "spin()".to_string(),
1049 HydroSource::ClusterMembers(location_id) => {
1050 format!(
1051 "source_stream(cluster_membership_stream({:?}))",
1052 location_id
1053 )
1054 }
1055 };
1056 build_source_node(structure, metadata, label)
1057 }
1058
1059 HydroNode::SingletonSource { value, metadata } => {
1060 let label = format!("singleton({})", value);
1061 build_source_node(structure, metadata, label)
1062 }
1063
1064 HydroNode::ExternalInput {
1065 from_external_id,
1066 from_port_id,
1067 metadata,
1068 ..
1069 } => build_source_node(
1070 structure,
1071 metadata,
1072 format!("external_input({}:{})", from_external_id, from_port_id),
1073 ),
1074
1075 HydroNode::CycleSource {
1076 ident, metadata, ..
1077 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1078
1079 HydroNode::Tee { inner, metadata } => {
1080 let ptr = inner.as_ptr();
1081 if let Some(&existing_id) = seen_tees.get(&ptr) {
1082 return existing_id;
1083 }
1084
1085 let input_id = inner
1086 .0
1087 .borrow()
1088 .build_graph_structure(structure, seen_tees, config);
1089 let tee_id = structure.add_node_with_metadata(
1090 NodeLabel::Static(extract_op_name(self.print_root())),
1091 HydroNodeType::Tee,
1092 metadata,
1093 );
1094
1095 seen_tees.insert(ptr, tee_id);
1096
1097 let inner_borrow = inner.0.borrow();
1099 let input_metadata = inner_borrow.metadata();
1100 add_edge_with_metadata(
1101 structure,
1102 input_id,
1103 tee_id,
1104 Some(input_metadata),
1105 Some(metadata),
1106 None,
1107 );
1108 drop(inner_borrow);
1109
1110 tee_id
1111 }
1112
1113 HydroNode::ObserveNonDet {
1115 inner, metadata, ..
1116 } => build_simple_transform(TransformParams {
1117 structure,
1118 seen_tees,
1119 config,
1120 input: inner,
1121 metadata,
1122 op_name: extract_op_name(self.print_root()),
1123 node_type: HydroNodeType::NonDeterministic,
1124 }),
1125
1126 HydroNode::Cast { inner, metadata }
1128 | HydroNode::DeferTick {
1129 input: inner,
1130 metadata,
1131 }
1132 | HydroNode::Enumerate {
1133 input: inner,
1134 metadata,
1135 ..
1136 }
1137 | HydroNode::Unique {
1138 input: inner,
1139 metadata,
1140 }
1141 | HydroNode::ResolveFutures {
1142 input: inner,
1143 metadata,
1144 }
1145 | HydroNode::ResolveFuturesOrdered {
1146 input: inner,
1147 metadata,
1148 } => build_simple_transform(TransformParams {
1149 structure,
1150 seen_tees,
1151 config,
1152 input: inner,
1153 metadata,
1154 op_name: extract_op_name(self.print_root()),
1155 node_type: HydroNodeType::Transform,
1156 }),
1157
1158 HydroNode::Sort {
1160 input: inner,
1161 metadata,
1162 } => build_simple_transform(TransformParams {
1163 structure,
1164 seen_tees,
1165 config,
1166 input: inner,
1167 metadata,
1168 op_name: extract_op_name(self.print_root()),
1169 node_type: HydroNodeType::Aggregation,
1170 }),
1171
1172 HydroNode::Map { f, input, metadata }
1174 | HydroNode::Filter { f, input, metadata }
1175 | HydroNode::FlatMap { f, input, metadata }
1176 | HydroNode::FilterMap { f, input, metadata }
1177 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1178 TransformParams {
1179 structure,
1180 seen_tees,
1181 config,
1182 input,
1183 metadata,
1184 op_name: extract_op_name(self.print_root()),
1185 node_type: HydroNodeType::Transform,
1186 },
1187 f,
1188 ),
1189
1190 HydroNode::Reduce { f, input, metadata }
1192 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1193 TransformParams {
1194 structure,
1195 seen_tees,
1196 config,
1197 input,
1198 metadata,
1199 op_name: extract_op_name(self.print_root()),
1200 node_type: HydroNodeType::Aggregation,
1201 },
1202 f,
1203 ),
1204
1205 HydroNode::Join {
1207 left,
1208 right,
1209 metadata,
1210 }
1211 | HydroNode::CrossProduct {
1212 left,
1213 right,
1214 metadata,
1215 }
1216 | HydroNode::CrossSingleton {
1217 left,
1218 right,
1219 metadata,
1220 } => {
1221 let left_id = left.build_graph_structure(structure, seen_tees, config);
1222 let right_id = right.build_graph_structure(structure, seen_tees, config);
1223 let node_id = structure.add_node_with_metadata(
1224 NodeLabel::Static(extract_op_name(self.print_root())),
1225 HydroNodeType::Join,
1226 metadata,
1227 );
1228
1229 let left_metadata = left.metadata();
1231 add_edge_with_metadata(
1232 structure,
1233 left_id,
1234 node_id,
1235 Some(left_metadata),
1236 Some(metadata),
1237 Some("left".to_string()),
1238 );
1239
1240 let right_metadata = right.metadata();
1242 add_edge_with_metadata(
1243 structure,
1244 right_id,
1245 node_id,
1246 Some(right_metadata),
1247 Some(metadata),
1248 Some("right".to_string()),
1249 );
1250
1251 node_id
1252 }
1253
1254 HydroNode::Difference {
1256 pos: left,
1257 neg: right,
1258 metadata,
1259 }
1260 | HydroNode::AntiJoin {
1261 pos: left,
1262 neg: right,
1263 metadata,
1264 } => {
1265 let left_id = left.build_graph_structure(structure, seen_tees, config);
1266 let right_id = right.build_graph_structure(structure, seen_tees, config);
1267 let node_id = structure.add_node_with_metadata(
1268 NodeLabel::Static(extract_op_name(self.print_root())),
1269 HydroNodeType::Join,
1270 metadata,
1271 );
1272
1273 let left_metadata = left.metadata();
1275 add_edge_with_metadata(
1276 structure,
1277 left_id,
1278 node_id,
1279 Some(left_metadata),
1280 Some(metadata),
1281 Some("pos".to_string()),
1282 );
1283
1284 let right_metadata = right.metadata();
1286 add_edge_with_metadata(
1287 structure,
1288 right_id,
1289 node_id,
1290 Some(right_metadata),
1291 Some(metadata),
1292 Some("neg".to_string()),
1293 );
1294
1295 node_id
1296 }
1297
1298 HydroNode::Fold {
1300 init,
1301 acc,
1302 input,
1303 metadata,
1304 }
1305 | HydroNode::FoldKeyed {
1306 init,
1307 acc,
1308 input,
1309 metadata,
1310 }
1311 | HydroNode::Scan {
1312 init,
1313 acc,
1314 input,
1315 metadata,
1316 } => {
1317 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1320 TransformParams {
1321 structure,
1322 seen_tees,
1323 config,
1324 input,
1325 metadata,
1326 op_name: extract_op_name(self.print_root()),
1327 node_type,
1328 },
1329 init,
1330 acc,
1331 )
1332 }
1333
1334 HydroNode::ReduceKeyedWatermark {
1336 f,
1337 input,
1338 watermark,
1339 metadata,
1340 } => {
1341 let input_id = input.build_graph_structure(structure, seen_tees, config);
1342 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1343 let location_id = setup_location(structure, metadata);
1344 let join_node_id = structure.add_node_with_backtrace(
1345 NodeLabel::Static(extract_op_name(self.print_root())),
1346 HydroNodeType::Join,
1347 location_id,
1348 Some(metadata.op.backtrace.clone()),
1349 );
1350
1351 let input_metadata = input.metadata();
1353 add_edge_with_metadata(
1354 structure,
1355 input_id,
1356 join_node_id,
1357 Some(input_metadata),
1358 Some(metadata),
1359 Some("input".to_string()),
1360 );
1361
1362 let watermark_metadata = watermark.metadata();
1364 add_edge_with_metadata(
1365 structure,
1366 watermark_id,
1367 join_node_id,
1368 Some(watermark_metadata),
1369 Some(metadata),
1370 Some("watermark".to_string()),
1371 );
1372
1373 let node_id = structure.add_node_with_backtrace(
1374 NodeLabel::with_exprs(
1375 extract_op_name(self.print_root()).to_string(),
1376 vec![f.clone()],
1377 ),
1378 HydroNodeType::Aggregation,
1379 location_id,
1380 Some(metadata.op.backtrace.clone()),
1381 );
1382
1383 let join_metadata = metadata; add_edge_with_metadata(
1386 structure,
1387 join_node_id,
1388 node_id,
1389 Some(join_metadata),
1390 Some(metadata),
1391 None,
1392 );
1393
1394 node_id
1395 }
1396
1397 HydroNode::Network {
1398 serialize_fn,
1399 deserialize_fn,
1400 input,
1401 metadata,
1402 ..
1403 } => {
1404 let input_id = input.build_graph_structure(structure, seen_tees, config);
1405 let _from_location_id = setup_location(structure, metadata);
1406
1407 let to_location_id = match metadata.location_kind.root() {
1408 LocationId::Process(id) => {
1409 structure.add_location(*id, "Process".to_string());
1410 Some(*id)
1411 }
1412 LocationId::Cluster(id) => {
1413 structure.add_location(*id, "Cluster".to_string());
1414 Some(*id)
1415 }
1416 _ => None,
1417 };
1418
1419 let mut label = "network(".to_string();
1420 if serialize_fn.is_some() {
1421 label.push_str("send");
1422 }
1423 if deserialize_fn.is_some() {
1424 if serialize_fn.is_some() {
1425 label.push_str(" + ");
1426 }
1427 label.push_str("recv");
1428 }
1429 label.push(')');
1430
1431 let network_id = structure.add_node_with_backtrace(
1432 NodeLabel::Static(label),
1433 HydroNodeType::Network,
1434 to_location_id,
1435 Some(metadata.op.backtrace.clone()),
1436 );
1437
1438 let input_metadata = input.metadata();
1440 add_edge_with_metadata(
1441 structure,
1442 input_id,
1443 network_id,
1444 Some(input_metadata),
1445 Some(metadata),
1446 Some(format!("to {:?}", to_location_id)),
1447 );
1448
1449 network_id
1450 }
1451
1452 HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1454 structure,
1455 seen_tees,
1456 config,
1457 input: inner,
1458 metadata,
1459 op_name: extract_op_name(self.print_root()),
1460 node_type: HydroNodeType::NonDeterministic,
1461 }),
1462
1463 HydroNode::YieldConcat { inner, .. } => {
1464 inner.build_graph_structure(structure, seen_tees, config)
1466 }
1467
1468 HydroNode::BeginAtomic { inner, .. } => {
1469 inner.build_graph_structure(structure, seen_tees, config)
1470 }
1471
1472 HydroNode::EndAtomic { inner, .. } => {
1473 inner.build_graph_structure(structure, seen_tees, config)
1474 }
1475
1476 HydroNode::Chain {
1477 first,
1478 second,
1479 metadata,
1480 } => {
1481 let first_id = first.build_graph_structure(structure, seen_tees, config);
1482 let second_id = second.build_graph_structure(structure, seen_tees, config);
1483 let location_id = setup_location(structure, metadata);
1484 let chain_id = structure.add_node_with_backtrace(
1485 NodeLabel::Static(extract_op_name(self.print_root())),
1486 HydroNodeType::Transform,
1487 location_id,
1488 Some(metadata.op.backtrace.clone()),
1489 );
1490
1491 let first_metadata = first.metadata();
1493 add_edge_with_metadata(
1494 structure,
1495 first_id,
1496 chain_id,
1497 Some(first_metadata),
1498 Some(metadata),
1499 Some("first".to_string()),
1500 );
1501
1502 let second_metadata = second.metadata();
1504 add_edge_with_metadata(
1505 structure,
1506 second_id,
1507 chain_id,
1508 Some(second_metadata),
1509 Some(metadata),
1510 Some("second".to_string()),
1511 );
1512
1513 chain_id
1514 }
1515
1516 HydroNode::ChainFirst {
1517 first,
1518 second,
1519 metadata,
1520 } => {
1521 let first_id = first.build_graph_structure(structure, seen_tees, config);
1522 let second_id = second.build_graph_structure(structure, seen_tees, config);
1523 let location_id = setup_location(structure, metadata);
1524 let chain_id = structure.add_node_with_backtrace(
1525 NodeLabel::Static(extract_op_name(self.print_root())),
1526 HydroNodeType::Transform,
1527 location_id,
1528 Some(metadata.op.backtrace.clone()),
1529 );
1530
1531 let first_metadata = first.metadata();
1533 add_edge_with_metadata(
1534 structure,
1535 first_id,
1536 chain_id,
1537 Some(first_metadata),
1538 Some(metadata),
1539 Some("first".to_string()),
1540 );
1541
1542 let second_metadata = second.metadata();
1544 add_edge_with_metadata(
1545 structure,
1546 second_id,
1547 chain_id,
1548 Some(second_metadata),
1549 Some(metadata),
1550 Some("second".to_string()),
1551 );
1552
1553 chain_id
1554 }
1555
1556 HydroNode::Counter {
1557 tag: _,
1558 prefix: _,
1559 duration,
1560 input,
1561 metadata,
1562 } => build_single_expr_transform(
1563 TransformParams {
1564 structure,
1565 seen_tees,
1566 config,
1567 input,
1568 metadata,
1569 op_name: extract_op_name(self.print_root()),
1570 node_type: HydroNodeType::Transform,
1571 },
1572 duration,
1573 ),
1574 }
1575 }
1576}
1577
1578macro_rules! render_hydro_ir {
1581 ($name:ident, $write_fn:ident) => {
1582 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1583 let mut output = String::new();
1584 $write_fn(&mut output, roots, config).unwrap();
1585 output
1586 }
1587 };
1588}
1589
1590macro_rules! write_hydro_ir {
1592 ($name:ident, $writer_type:ty, $constructor:expr) => {
1593 pub fn $name(
1594 output: impl std::fmt::Write,
1595 roots: &[HydroRoot],
1596 config: &HydroWriteConfig,
1597 ) -> std::fmt::Result {
1598 let mut graph_write: $writer_type = $constructor(output, config);
1599 write_hydro_ir_graph(&mut graph_write, roots, config)
1600 }
1601 };
1602}
1603
1604render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1605write_hydro_ir!(
1606 write_hydro_ir_mermaid,
1607 HydroMermaid<_>,
1608 HydroMermaid::new_with_config
1609);
1610
1611render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1612write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1613
1614render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1616
1617render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1619write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1620
1621fn write_hydro_ir_graph<W>(
1622 graph_write: W,
1623 roots: &[HydroRoot],
1624 config: &HydroWriteConfig,
1625) -> Result<(), W::Err>
1626where
1627 W: HydroGraphWrite,
1628{
1629 let mut structure = HydroGraphStructure::new();
1630 let mut seen_tees = HashMap::new();
1631
1632 for leaf in roots {
1634 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1635 }
1636
1637 write_graph_structure(&structure, graph_write, config)
1638}