1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::json::HydroJson;
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
11use crate::compile::ir::backtrace::Backtrace;
12use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
13use crate::location::dynamic::LocationId;
14
15#[derive(Debug, Clone)]
17pub enum NodeLabel {
18 Static(String),
20 WithExprs {
22 op_name: String,
23 exprs: Vec<DebugExpr>,
24 },
25}
26
27impl NodeLabel {
28 pub fn static_label(s: String) -> Self {
30 Self::Static(s)
31 }
32
33 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
35 Self::WithExprs { op_name, exprs }
36 }
37}
38
39impl std::fmt::Display for NodeLabel {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Self::Static(s) => write!(f, "{}", s),
43 Self::WithExprs { op_name, exprs } => {
44 if exprs.is_empty() {
45 write!(f, "{}()", op_name)
46 } else {
47 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
48 write!(f, "{}({})", op_name, expr_strs.join(", "))
49 }
50 }
51 }
52 }
53}
54
55pub struct IndentedGraphWriter<W> {
58 pub write: W,
59 pub indent: usize,
60 pub config: HydroWriteConfig,
61}
62
63impl<W> IndentedGraphWriter<W> {
64 pub fn new(write: W) -> Self {
66 Self {
67 write,
68 indent: 0,
69 config: HydroWriteConfig::default(),
70 }
71 }
72
73 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
75 Self {
76 write,
77 indent: 0,
78 config: config.clone(),
79 }
80 }
81}
82
83impl<W: Write> IndentedGraphWriter<W> {
84 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
86 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
87 }
88}
89
90pub type GraphWriteError = std::fmt::Error;
92
93#[auto_impl(&mut, Box)]
95pub trait HydroGraphWrite {
96 type Err: Error;
98
99 fn write_prologue(&mut self) -> Result<(), Self::Err>;
101
102 fn write_node_definition(
104 &mut self,
105 node_id: usize,
106 node_label: &NodeLabel,
107 node_type: HydroNodeType,
108 location_id: Option<usize>,
109 location_type: Option<&str>,
110 backtrace: Option<&Backtrace>,
111 ) -> Result<(), Self::Err>;
112
113 fn write_edge(
115 &mut self,
116 src_id: usize,
117 dst_id: usize,
118 edge_properties: &HashSet<HydroEdgeProp>,
119 label: Option<&str>,
120 ) -> Result<(), Self::Err>;
121
122 fn write_location_start(
124 &mut self,
125 location_id: usize,
126 location_type: &str,
127 ) -> Result<(), Self::Err>;
128
129 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
131
132 fn write_location_end(&mut self) -> Result<(), Self::Err>;
134
135 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
137}
138
139pub mod node_type_utils {
141 use super::HydroNodeType;
142
143 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
145 (HydroNodeType::Source, "Source"),
146 (HydroNodeType::Transform, "Transform"),
147 (HydroNodeType::Join, "Join"),
148 (HydroNodeType::Aggregation, "Aggregation"),
149 (HydroNodeType::Network, "Network"),
150 (HydroNodeType::Sink, "Sink"),
151 (HydroNodeType::Tee, "Tee"),
152 ];
153
154 pub fn to_string(node_type: HydroNodeType) -> &'static str {
156 NODE_TYPE_DATA
157 .iter()
158 .find(|(nt, _)| *nt == node_type)
159 .map(|(_, name)| *name)
160 .unwrap_or("Unknown")
161 }
162
163 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
165 NODE_TYPE_DATA.to_vec()
166 }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum HydroNodeType {
172 Source,
173 Transform,
174 Join,
175 Aggregation,
176 Network,
177 Sink,
178 Tee,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
183pub enum HydroEdgeProp {
184 Bounded,
185 Unbounded,
186 TotalOrder,
187 NoOrder,
188 Keyed,
189 Stream,
191 KeyedSingleton,
192 KeyedStream,
193 Singleton,
194 Optional,
195 Network,
196 Cycle,
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct UnifiedEdgeStyle {
203 pub line_pattern: LinePattern,
205 pub line_width: u8,
207 pub arrowhead: ArrowheadStyle,
209 pub line_style: LineStyle,
211 pub halo: HaloStyle,
213 pub waviness: WavinessStyle,
215 pub animation: AnimationStyle,
217 pub color: &'static str,
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum LinePattern {
223 Solid,
224 Dotted,
225 Dashed,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum ArrowheadStyle {
230 TriangleFilled,
231 CircleFilled,
232 DiamondOpen,
233 Default,
234}
235
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub enum LineStyle {
238 Single,
240 HashMarks,
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245pub enum HaloStyle {
246 None,
247 LightBlue,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum WavinessStyle {
252 None,
253 Wavy,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum AnimationStyle {
258 Static,
259 Animated,
260}
261
262impl Default for UnifiedEdgeStyle {
263 fn default() -> Self {
264 Self {
265 line_pattern: LinePattern::Solid,
266 line_width: 1,
267 arrowhead: ArrowheadStyle::Default,
268 line_style: LineStyle::Single,
269 halo: HaloStyle::None,
270 waviness: WavinessStyle::None,
271 animation: AnimationStyle::Static,
272 color: "#666666",
273 }
274 }
275}
276
277pub fn get_unified_edge_style(
290 edge_properties: &HashSet<HydroEdgeProp>,
291 src_location: Option<usize>,
292 dst_location: Option<usize>,
293) -> UnifiedEdgeStyle {
294 let mut style = UnifiedEdgeStyle::default();
295
296 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
298 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
299
300 if is_network {
301 style.line_pattern = LinePattern::Dashed;
302 style.animation = AnimationStyle::Animated;
303 } else {
304 style.line_pattern = LinePattern::Solid;
305 style.animation = AnimationStyle::Static;
306 }
307
308 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
310 style.halo = HaloStyle::LightBlue;
311 } else {
312 style.halo = HaloStyle::None;
313 }
314
315 if edge_properties.contains(&HydroEdgeProp::Stream) {
317 style.arrowhead = ArrowheadStyle::TriangleFilled;
318 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
320 style.arrowhead = ArrowheadStyle::TriangleFilled;
321 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
323 style.arrowhead = ArrowheadStyle::TriangleFilled;
324 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
326 style.arrowhead = ArrowheadStyle::CircleFilled;
327 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
329 style.arrowhead = ArrowheadStyle::DiamondOpen;
330 style.color = "#6b7280"; }
332
333 if edge_properties.contains(&HydroEdgeProp::Keyed) {
335 style.line_style = LineStyle::HashMarks; } else {
337 style.line_style = LineStyle::Single;
338 }
339
340 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
342 style.waviness = WavinessStyle::Wavy;
343 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
344 style.waviness = WavinessStyle::None;
345 }
346
347 style
348}
349
350pub fn extract_edge_properties_from_collection_kind(
354 collection_kind: &crate::compile::ir::CollectionKind,
355) -> HashSet<HydroEdgeProp> {
356 use crate::compile::ir::CollectionKind;
357
358 let mut properties = HashSet::new();
359
360 match collection_kind {
361 CollectionKind::Stream { bound, order, .. } => {
362 properties.insert(HydroEdgeProp::Stream);
363 add_bound_property(&mut properties, bound);
364 add_order_property(&mut properties, order);
365 }
366 CollectionKind::KeyedStream {
367 bound, value_order, ..
368 } => {
369 properties.insert(HydroEdgeProp::KeyedStream);
370 properties.insert(HydroEdgeProp::Keyed);
371 add_bound_property(&mut properties, bound);
372 add_order_property(&mut properties, value_order);
373 }
374 CollectionKind::Singleton { bound, .. } => {
375 properties.insert(HydroEdgeProp::Singleton);
376 add_bound_property(&mut properties, bound);
377 properties.insert(HydroEdgeProp::TotalOrder);
379 }
380 CollectionKind::Optional { bound, .. } => {
381 properties.insert(HydroEdgeProp::Optional);
382 add_bound_property(&mut properties, bound);
383 properties.insert(HydroEdgeProp::TotalOrder);
385 }
386 CollectionKind::KeyedSingleton { bound, .. } => {
387 properties.insert(HydroEdgeProp::Singleton);
388 properties.insert(HydroEdgeProp::Keyed);
389 add_keyed_singleton_bound_property(&mut properties, bound);
391 properties.insert(HydroEdgeProp::TotalOrder);
392 }
393 }
394
395 properties
396}
397
398fn add_bound_property(
400 properties: &mut HashSet<HydroEdgeProp>,
401 bound: &crate::compile::ir::BoundKind,
402) {
403 use crate::compile::ir::BoundKind;
404
405 match bound {
406 BoundKind::Bounded => {
407 properties.insert(HydroEdgeProp::Bounded);
408 }
409 BoundKind::Unbounded => {
410 properties.insert(HydroEdgeProp::Unbounded);
411 }
412 }
413}
414
415fn add_keyed_singleton_bound_property(
417 properties: &mut HashSet<HydroEdgeProp>,
418 bound: &crate::compile::ir::KeyedSingletonBoundKind,
419) {
420 use crate::compile::ir::KeyedSingletonBoundKind;
421
422 match bound {
423 KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
424 properties.insert(HydroEdgeProp::Bounded);
425 }
426 KeyedSingletonBoundKind::Unbounded => {
427 properties.insert(HydroEdgeProp::Unbounded);
428 }
429 }
430}
431
432fn add_order_property(
434 properties: &mut HashSet<HydroEdgeProp>,
435 order: &crate::compile::ir::StreamOrder,
436) {
437 use crate::compile::ir::StreamOrder;
438
439 match order {
440 StreamOrder::TotalOrder => {
441 properties.insert(HydroEdgeProp::TotalOrder);
442 }
443 StreamOrder::NoOrder => {
444 properties.insert(HydroEdgeProp::NoOrder);
445 }
446 }
447}
448
449pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
452 src_location.root() != dst_location.root()
454}
455
456pub fn add_network_edge_tag(
458 properties: &mut HashSet<HydroEdgeProp>,
459 src_location: &LocationId,
460 dst_location: &LocationId,
461) {
462 if is_network_edge(src_location, dst_location) {
463 properties.insert(HydroEdgeProp::Network);
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct HydroWriteConfig {
470 pub show_metadata: bool,
471 pub show_location_groups: bool,
472 pub use_short_labels: bool,
473 pub process_id_name: Vec<(usize, String)>,
474 pub cluster_id_name: Vec<(usize, String)>,
475 pub external_id_name: Vec<(usize, String)>,
476}
477
478impl Default for HydroWriteConfig {
479 fn default() -> Self {
480 Self {
481 show_metadata: false,
482 show_location_groups: true,
483 use_short_labels: true, process_id_name: vec![],
485 cluster_id_name: vec![],
486 external_id_name: vec![],
487 }
488 }
489}
490
491#[derive(Clone)]
493pub struct HydroGraphNode {
494 pub label: NodeLabel,
495 pub node_type: HydroNodeType,
496 pub location: Option<usize>,
497 pub backtrace: Option<Backtrace>,
498}
499
500#[derive(Debug, Clone)]
502pub struct HydroGraphEdge {
503 pub src: usize,
504 pub dst: usize,
505 pub edge_properties: HashSet<HydroEdgeProp>,
506 pub label: Option<String>,
507}
508
509#[derive(Default)]
511pub struct HydroGraphStructure {
512 pub nodes: HashMap<usize, HydroGraphNode>,
513 pub edges: Vec<HydroGraphEdge>,
514 pub locations: HashMap<usize, String>, pub next_node_id: usize,
516}
517
518impl HydroGraphStructure {
519 pub fn new() -> Self {
520 Self::default()
521 }
522
523 pub fn add_node(
524 &mut self,
525 label: NodeLabel,
526 node_type: HydroNodeType,
527 location: Option<usize>,
528 ) -> usize {
529 self.add_node_with_backtrace(label, node_type, location, None)
530 }
531
532 pub fn add_node_with_backtrace(
533 &mut self,
534 label: NodeLabel,
535 node_type: HydroNodeType,
536 location: Option<usize>,
537 backtrace: Option<Backtrace>,
538 ) -> usize {
539 let node_id = self.next_node_id;
540 self.next_node_id += 1;
541 self.nodes.insert(
542 node_id,
543 HydroGraphNode {
544 label,
545 node_type,
546 location,
547 backtrace,
548 },
549 );
550 node_id
551 }
552
553 pub fn add_node_with_metadata(
555 &mut self,
556 label: NodeLabel,
557 node_type: HydroNodeType,
558 metadata: &HydroIrMetadata,
559 ) -> usize {
560 let location = setup_location(self, metadata);
561 let backtrace = Some(metadata.op.backtrace.clone());
562 self.add_node_with_backtrace(label, node_type, location, backtrace)
563 }
564
565 pub fn add_edge(
566 &mut self,
567 src: usize,
568 dst: usize,
569 edge_properties: HashSet<HydroEdgeProp>,
570 label: Option<String>,
571 ) {
572 self.edges.push(HydroGraphEdge {
573 src,
574 dst,
575 edge_properties,
576 label,
577 });
578 }
579
580 pub fn add_edge_single(
582 &mut self,
583 src: usize,
584 dst: usize,
585 edge_type: HydroEdgeProp,
586 label: Option<String>,
587 ) {
588 let mut properties = HashSet::new();
589 properties.insert(edge_type);
590 self.edges.push(HydroGraphEdge {
591 src,
592 dst,
593 edge_properties: properties,
594 label,
595 });
596 }
597
598 pub fn add_location(&mut self, location_id: usize, location_type: String) {
599 self.locations.insert(location_id, location_type);
600 }
601}
602
603pub fn extract_op_name(full_label: String) -> String {
605 full_label
606 .split('(')
607 .next()
608 .unwrap_or("unknown")
609 .to_string()
610 .to_lowercase()
611}
612
613pub fn extract_short_label(full_label: &str) -> String {
615 if let Some(op_name) = full_label.split('(').next() {
617 let base_name = op_name.to_lowercase();
618 match base_name.as_str() {
619 "source" => {
621 if full_label.contains("Iter") {
622 "source_iter".to_string()
623 } else if full_label.contains("Stream") {
624 "source_stream".to_string()
625 } else if full_label.contains("ExternalNetwork") {
626 "external_network".to_string()
627 } else if full_label.contains("Spin") {
628 "spin".to_string()
629 } else {
630 "source".to_string()
631 }
632 }
633 "network" => {
634 if full_label.contains("deser") {
635 "network(recv)".to_string()
636 } else if full_label.contains("ser") {
637 "network(send)".to_string()
638 } else {
639 "network".to_string()
640 }
641 }
642 _ => base_name,
644 }
645 } else {
646 if full_label.len() > 20 {
648 format!("{}...", &full_label[..17])
649 } else {
650 full_label.to_string()
651 }
652 }
653}
654
655fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
657 match location_id.root() {
658 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
659 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
660 _ => panic!("unexpected location type"),
661 }
662}
663
664fn setup_location(
666 structure: &mut HydroGraphStructure,
667 metadata: &HydroIrMetadata,
668) -> Option<usize> {
669 let (location_id, location_type) = extract_location_id(&metadata.location_kind);
670 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
671 structure.add_location(loc_id, loc_type);
672 }
673 location_id
674}
675
676fn add_edge_with_metadata(
679 structure: &mut HydroGraphStructure,
680 src_id: usize,
681 dst_id: usize,
682 src_metadata: Option<&HydroIrMetadata>,
683 dst_metadata: Option<&HydroIrMetadata>,
684 label: Option<String>,
685) {
686 let mut properties = HashSet::new();
687
688 if let Some(metadata) = src_metadata {
690 properties.extend(extract_edge_properties_from_collection_kind(
691 &metadata.collection_kind,
692 ));
693 }
694
695 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
697 add_network_edge_tag(
698 &mut properties,
699 &src_meta.location_kind,
700 &dst_meta.location_kind,
701 );
702 }
703
704 if properties.is_empty() {
706 properties.insert(HydroEdgeProp::Stream);
707 }
708
709 structure.add_edge(src_id, dst_id, properties, label);
710}
711
712fn write_graph_structure<W>(
714 structure: &HydroGraphStructure,
715 graph_write: W,
716 config: &HydroWriteConfig,
717) -> Result<(), W::Err>
718where
719 W: HydroGraphWrite,
720{
721 let mut graph_write = graph_write;
722 graph_write.write_prologue()?;
724
725 for (&node_id, node) in &structure.nodes {
727 let (location_id, location_type) = if let Some(loc_id) = node.location {
728 (
729 Some(loc_id),
730 structure.locations.get(&loc_id).map(|s| s.as_str()),
731 )
732 } else {
733 (None, None)
734 };
735
736 graph_write.write_node_definition(
737 node_id,
738 &node.label,
739 node.node_type,
740 location_id,
741 location_type,
742 node.backtrace.as_ref(),
743 )?;
744 }
745
746 if config.show_location_groups {
748 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
749 for (&node_id, node) in &structure.nodes {
750 if let Some(location_id) = node.location {
751 nodes_by_location
752 .entry(location_id)
753 .or_default()
754 .push(node_id);
755 }
756 }
757
758 for (&location_id, node_ids) in &nodes_by_location {
759 if let Some(location_type) = structure.locations.get(&location_id) {
760 graph_write.write_location_start(location_id, location_type)?;
761 for &node_id in node_ids {
762 graph_write.write_node(node_id)?;
763 }
764 graph_write.write_location_end()?;
765 }
766 }
767 }
768
769 for edge in &structure.edges {
771 graph_write.write_edge(
772 edge.src,
773 edge.dst,
774 &edge.edge_properties,
775 edge.label.as_deref(),
776 )?;
777 }
778
779 graph_write.write_epilogue()?;
780 Ok(())
781}
782
783impl HydroRoot {
784 pub fn build_graph_structure(
786 &self,
787 structure: &mut HydroGraphStructure,
788 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
789 config: &HydroWriteConfig,
790 ) -> usize {
791 fn build_sink_node(
793 structure: &mut HydroGraphStructure,
794 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
795 config: &HydroWriteConfig,
796 input: &HydroNode,
797 sink_metadata: Option<&HydroIrMetadata>,
798 label: NodeLabel,
799 ) -> usize {
800 let input_id = input.build_graph_structure(structure, seen_tees, config);
801
802 let effective_metadata = if let Some(meta) = sink_metadata {
804 Some(meta)
805 } else {
806 match input {
807 HydroNode::Placeholder => None,
808 _ => Some(input.metadata()),
810 }
811 };
812
813 let location_id = effective_metadata.and_then(|m| setup_location(structure, m));
814 let sink_id = structure.add_node_with_backtrace(
815 label,
816 HydroNodeType::Sink,
817 location_id,
818 effective_metadata.map(|m| m.op.backtrace.clone()),
819 );
820
821 let input_metadata = input.metadata();
823 add_edge_with_metadata(
824 structure,
825 input_id,
826 sink_id,
827 Some(input_metadata),
828 sink_metadata,
829 None,
830 );
831
832 sink_id
833 }
834
835 match self {
836 HydroRoot::ForEach { f, input, .. } => build_sink_node(
838 structure,
839 seen_tees,
840 config,
841 input,
842 None,
843 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
844 ),
845
846 HydroRoot::SendExternal {
847 to_external_id,
848 to_key,
849 input,
850 ..
851 } => build_sink_node(
852 structure,
853 seen_tees,
854 config,
855 input,
856 None,
857 NodeLabel::with_exprs(
858 format!("send_external({}:{})", to_external_id, to_key),
859 vec![],
860 ),
861 ),
862
863 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
864 structure,
865 seen_tees,
866 config,
867 input,
868 None,
869 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
870 ),
871
872 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
873 structure,
874 seen_tees,
875 config,
876 input,
877 None,
878 NodeLabel::static_label(format!("cycle_sink({})", ident)),
879 ),
880 }
881 }
882}
883
884impl HydroNode {
885 pub fn build_graph_structure(
887 &self,
888 structure: &mut HydroGraphStructure,
889 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
890 config: &HydroWriteConfig,
891 ) -> usize {
892 use crate::location::dynamic::LocationId;
893
894 struct TransformParams<'a> {
898 structure: &'a mut HydroGraphStructure,
899 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
900 config: &'a HydroWriteConfig,
901 input: &'a HydroNode,
902 metadata: &'a HydroIrMetadata,
903 op_name: String,
904 node_type: HydroNodeType,
905 }
906
907 fn build_simple_transform(params: TransformParams) -> usize {
909 let input_id = params.input.build_graph_structure(
910 params.structure,
911 params.seen_tees,
912 params.config,
913 );
914 let node_id = params.structure.add_node_with_metadata(
915 NodeLabel::Static(params.op_name.to_string()),
916 params.node_type,
917 params.metadata,
918 );
919
920 let input_metadata = params.input.metadata();
922 add_edge_with_metadata(
923 params.structure,
924 input_id,
925 node_id,
926 Some(input_metadata),
927 Some(params.metadata),
928 None,
929 );
930
931 node_id
932 }
933
934 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
936 let input_id = params.input.build_graph_structure(
937 params.structure,
938 params.seen_tees,
939 params.config,
940 );
941 let node_id = params.structure.add_node_with_metadata(
942 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
943 params.node_type,
944 params.metadata,
945 );
946
947 let input_metadata = params.input.metadata();
949 add_edge_with_metadata(
950 params.structure,
951 input_id,
952 node_id,
953 Some(input_metadata),
954 Some(params.metadata),
955 None,
956 );
957
958 node_id
959 }
960
961 fn build_dual_expr_transform(
963 params: TransformParams,
964 expr1: &DebugExpr,
965 expr2: &DebugExpr,
966 ) -> usize {
967 let input_id = params.input.build_graph_structure(
968 params.structure,
969 params.seen_tees,
970 params.config,
971 );
972 let node_id = params.structure.add_node_with_metadata(
973 NodeLabel::with_exprs(
974 params.op_name.to_string(),
975 vec![expr1.clone(), expr2.clone()],
976 ),
977 params.node_type,
978 params.metadata,
979 );
980
981 let input_metadata = params.input.metadata();
983 add_edge_with_metadata(
984 params.structure,
985 input_id,
986 node_id,
987 Some(input_metadata),
988 Some(params.metadata),
989 None,
990 );
991
992 node_id
993 }
994
995 fn build_source_node(
997 structure: &mut HydroGraphStructure,
998 metadata: &HydroIrMetadata,
999 label: String,
1000 ) -> usize {
1001 structure.add_node_with_metadata(
1002 NodeLabel::Static(label),
1003 HydroNodeType::Source,
1004 metadata,
1005 )
1006 }
1007
1008 match self {
1009 HydroNode::Placeholder => structure.add_node(
1010 NodeLabel::Static("PLACEHOLDER".to_string()),
1011 HydroNodeType::Transform,
1012 None,
1013 ),
1014
1015 HydroNode::Source {
1016 source, metadata, ..
1017 } => {
1018 let label = match source {
1019 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1020 HydroSource::ExternalNetwork() => "external_network()".to_string(),
1021 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1022 HydroSource::Spin() => "spin()".to_string(),
1023 };
1024 build_source_node(structure, metadata, label)
1025 }
1026
1027 HydroNode::SingletonSource { value, metadata } => {
1028 let label = format!("singleton({})", value);
1029 build_source_node(structure, metadata, label)
1030 }
1031
1032 HydroNode::ExternalInput {
1033 from_external_id,
1034 from_key,
1035 metadata,
1036 ..
1037 } => build_source_node(
1038 structure,
1039 metadata,
1040 format!("external_input({}:{})", from_external_id, from_key),
1041 ),
1042
1043 HydroNode::CycleSource {
1044 ident, metadata, ..
1045 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1046
1047 HydroNode::Tee { inner, metadata } => {
1048 let ptr = inner.as_ptr();
1049 if let Some(&existing_id) = seen_tees.get(&ptr) {
1050 return existing_id;
1051 }
1052
1053 let input_id = inner
1054 .0
1055 .borrow()
1056 .build_graph_structure(structure, seen_tees, config);
1057 let tee_id = structure.add_node_with_metadata(
1058 NodeLabel::Static(extract_op_name(self.print_root())),
1059 HydroNodeType::Tee,
1060 metadata,
1061 );
1062
1063 seen_tees.insert(ptr, tee_id);
1064
1065 let inner_borrow = inner.0.borrow();
1067 let input_metadata = inner_borrow.metadata();
1068 add_edge_with_metadata(
1069 structure,
1070 input_id,
1071 tee_id,
1072 Some(input_metadata),
1073 Some(metadata),
1074 None,
1075 );
1076 drop(inner_borrow);
1077
1078 tee_id
1079 }
1080
1081 HydroNode::Cast { inner, metadata }
1083 | HydroNode::ObserveNonDet {
1084 inner, metadata, ..
1085 }
1086 | HydroNode::DeferTick {
1087 input: inner,
1088 metadata,
1089 }
1090 | HydroNode::Enumerate {
1091 input: inner,
1092 metadata,
1093 ..
1094 }
1095 | HydroNode::Unique {
1096 input: inner,
1097 metadata,
1098 }
1099 | HydroNode::ResolveFutures {
1100 input: inner,
1101 metadata,
1102 }
1103 | HydroNode::ResolveFuturesOrdered {
1104 input: inner,
1105 metadata,
1106 } => build_simple_transform(TransformParams {
1107 structure,
1108 seen_tees,
1109 config,
1110 input: inner,
1111 metadata,
1112 op_name: extract_op_name(self.print_root()),
1113 node_type: HydroNodeType::Transform,
1114 }),
1115
1116 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
1118 structure,
1119 seen_tees,
1120 config,
1121 input: inner,
1122 metadata,
1123 op_name: extract_op_name(self.print_root()),
1124 node_type: HydroNodeType::Transform,
1125 }),
1126
1127 HydroNode::Sort {
1129 input: inner,
1130 metadata,
1131 } => build_simple_transform(TransformParams {
1132 structure,
1133 seen_tees,
1134 config,
1135 input: inner,
1136 metadata,
1137 op_name: extract_op_name(self.print_root()),
1138 node_type: HydroNodeType::Aggregation,
1139 }),
1140
1141 HydroNode::Map { f, input, metadata }
1143 | HydroNode::Filter { f, input, metadata }
1144 | HydroNode::FlatMap { f, input, metadata }
1145 | HydroNode::FilterMap { f, input, metadata }
1146 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1147 TransformParams {
1148 structure,
1149 seen_tees,
1150 config,
1151 input,
1152 metadata,
1153 op_name: extract_op_name(self.print_root()),
1154 node_type: HydroNodeType::Transform,
1155 },
1156 f,
1157 ),
1158
1159 HydroNode::Reduce { f, input, metadata }
1161 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1162 TransformParams {
1163 structure,
1164 seen_tees,
1165 config,
1166 input,
1167 metadata,
1168 op_name: extract_op_name(self.print_root()),
1169 node_type: HydroNodeType::Aggregation,
1170 },
1171 f,
1172 ),
1173
1174 HydroNode::Join {
1176 left,
1177 right,
1178 metadata,
1179 }
1180 | HydroNode::CrossProduct {
1181 left,
1182 right,
1183 metadata,
1184 }
1185 | HydroNode::CrossSingleton {
1186 left,
1187 right,
1188 metadata,
1189 } => {
1190 let left_id = left.build_graph_structure(structure, seen_tees, config);
1191 let right_id = right.build_graph_structure(structure, seen_tees, config);
1192 let node_id = structure.add_node_with_metadata(
1193 NodeLabel::Static(extract_op_name(self.print_root())),
1194 HydroNodeType::Join,
1195 metadata,
1196 );
1197
1198 let left_metadata = left.metadata();
1200 add_edge_with_metadata(
1201 structure,
1202 left_id,
1203 node_id,
1204 Some(left_metadata),
1205 Some(metadata),
1206 Some("left".to_string()),
1207 );
1208
1209 let right_metadata = right.metadata();
1211 add_edge_with_metadata(
1212 structure,
1213 right_id,
1214 node_id,
1215 Some(right_metadata),
1216 Some(metadata),
1217 Some("right".to_string()),
1218 );
1219
1220 node_id
1221 }
1222
1223 HydroNode::Difference {
1225 pos: left,
1226 neg: right,
1227 metadata,
1228 }
1229 | HydroNode::AntiJoin {
1230 pos: left,
1231 neg: right,
1232 metadata,
1233 } => {
1234 let left_id = left.build_graph_structure(structure, seen_tees, config);
1235 let right_id = right.build_graph_structure(structure, seen_tees, config);
1236 let node_id = structure.add_node_with_metadata(
1237 NodeLabel::Static(extract_op_name(self.print_root())),
1238 HydroNodeType::Join,
1239 metadata,
1240 );
1241
1242 let left_metadata = left.metadata();
1244 add_edge_with_metadata(
1245 structure,
1246 left_id,
1247 node_id,
1248 Some(left_metadata),
1249 Some(metadata),
1250 Some("pos".to_string()),
1251 );
1252
1253 let right_metadata = right.metadata();
1255 add_edge_with_metadata(
1256 structure,
1257 right_id,
1258 node_id,
1259 Some(right_metadata),
1260 Some(metadata),
1261 Some("neg".to_string()),
1262 );
1263
1264 node_id
1265 }
1266
1267 HydroNode::Fold {
1269 init,
1270 acc,
1271 input,
1272 metadata,
1273 }
1274 | HydroNode::FoldKeyed {
1275 init,
1276 acc,
1277 input,
1278 metadata,
1279 }
1280 | HydroNode::Scan {
1281 init,
1282 acc,
1283 input,
1284 metadata,
1285 } => {
1286 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1289 TransformParams {
1290 structure,
1291 seen_tees,
1292 config,
1293 input,
1294 metadata,
1295 op_name: extract_op_name(self.print_root()),
1296 node_type,
1297 },
1298 init,
1299 acc,
1300 )
1301 }
1302
1303 HydroNode::ReduceKeyedWatermark {
1305 f,
1306 input,
1307 watermark,
1308 metadata,
1309 } => {
1310 let input_id = input.build_graph_structure(structure, seen_tees, config);
1311 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1312 let location_id = setup_location(structure, metadata);
1313 let join_node_id = structure.add_node_with_backtrace(
1314 NodeLabel::Static(extract_op_name(self.print_root())),
1315 HydroNodeType::Join,
1316 location_id,
1317 Some(metadata.op.backtrace.clone()),
1318 );
1319
1320 let input_metadata = input.metadata();
1322 add_edge_with_metadata(
1323 structure,
1324 input_id,
1325 join_node_id,
1326 Some(input_metadata),
1327 Some(metadata),
1328 Some("input".to_string()),
1329 );
1330
1331 let watermark_metadata = watermark.metadata();
1333 add_edge_with_metadata(
1334 structure,
1335 watermark_id,
1336 join_node_id,
1337 Some(watermark_metadata),
1338 Some(metadata),
1339 Some("watermark".to_string()),
1340 );
1341
1342 let node_id = structure.add_node_with_backtrace(
1343 NodeLabel::with_exprs(
1344 extract_op_name(self.print_root()).to_string(),
1345 vec![f.clone()],
1346 ),
1347 HydroNodeType::Aggregation,
1348 location_id,
1349 Some(metadata.op.backtrace.clone()),
1350 );
1351
1352 let join_metadata = metadata; add_edge_with_metadata(
1355 structure,
1356 join_node_id,
1357 node_id,
1358 Some(join_metadata),
1359 Some(metadata),
1360 None,
1361 );
1362
1363 node_id
1364 }
1365
1366 HydroNode::Network {
1367 serialize_fn,
1368 deserialize_fn,
1369 input,
1370 metadata,
1371 ..
1372 } => {
1373 let input_id = input.build_graph_structure(structure, seen_tees, config);
1374 let _from_location_id = setup_location(structure, metadata);
1375
1376 let to_location_id = match metadata.location_kind.root() {
1377 LocationId::Process(id) => {
1378 structure.add_location(*id, "Process".to_string());
1379 Some(*id)
1380 }
1381 LocationId::Cluster(id) => {
1382 structure.add_location(*id, "Cluster".to_string());
1383 Some(*id)
1384 }
1385 _ => None,
1386 };
1387
1388 let mut label = "network(".to_string();
1389 if serialize_fn.is_some() {
1390 label.push_str("send");
1391 }
1392 if deserialize_fn.is_some() {
1393 if serialize_fn.is_some() {
1394 label.push_str(" + ");
1395 }
1396 label.push_str("recv");
1397 }
1398 label.push(')');
1399
1400 let network_id = structure.add_node_with_backtrace(
1401 NodeLabel::Static(label),
1402 HydroNodeType::Network,
1403 to_location_id,
1404 Some(metadata.op.backtrace.clone()),
1405 );
1406
1407 let input_metadata = input.metadata();
1409 add_edge_with_metadata(
1410 structure,
1411 input_id,
1412 network_id,
1413 Some(input_metadata),
1414 Some(metadata),
1415 Some(format!("to {:?}", to_location_id)),
1416 );
1417
1418 network_id
1419 }
1420
1421 HydroNode::Batch { inner, .. } => {
1423 inner.build_graph_structure(structure, seen_tees, config)
1425 }
1426
1427 HydroNode::YieldConcat { inner, .. } => {
1428 inner.build_graph_structure(structure, seen_tees, config)
1430 }
1431
1432 HydroNode::BeginAtomic { inner, .. } => {
1433 inner.build_graph_structure(structure, seen_tees, config)
1434 }
1435
1436 HydroNode::EndAtomic { inner, .. } => {
1437 inner.build_graph_structure(structure, seen_tees, config)
1438 }
1439
1440 HydroNode::Chain {
1441 first,
1442 second,
1443 metadata,
1444 } => {
1445 let first_id = first.build_graph_structure(structure, seen_tees, config);
1446 let second_id = second.build_graph_structure(structure, seen_tees, config);
1447 let location_id = setup_location(structure, metadata);
1448 let chain_id = structure.add_node_with_backtrace(
1449 NodeLabel::Static(extract_op_name(self.print_root())),
1450 HydroNodeType::Transform,
1451 location_id,
1452 Some(metadata.op.backtrace.clone()),
1453 );
1454
1455 let first_metadata = first.metadata();
1457 add_edge_with_metadata(
1458 structure,
1459 first_id,
1460 chain_id,
1461 Some(first_metadata),
1462 Some(metadata),
1463 Some("first".to_string()),
1464 );
1465
1466 let second_metadata = second.metadata();
1468 add_edge_with_metadata(
1469 structure,
1470 second_id,
1471 chain_id,
1472 Some(second_metadata),
1473 Some(metadata),
1474 Some("second".to_string()),
1475 );
1476
1477 chain_id
1478 }
1479
1480 HydroNode::ChainFirst {
1481 first,
1482 second,
1483 metadata,
1484 } => {
1485 let first_id = first.build_graph_structure(structure, seen_tees, config);
1486 let second_id = second.build_graph_structure(structure, seen_tees, config);
1487 let location_id = setup_location(structure, metadata);
1488 let chain_id = structure.add_node_with_backtrace(
1489 NodeLabel::Static(extract_op_name(self.print_root())),
1490 HydroNodeType::Transform,
1491 location_id,
1492 Some(metadata.op.backtrace.clone()),
1493 );
1494
1495 let first_metadata = first.metadata();
1497 add_edge_with_metadata(
1498 structure,
1499 first_id,
1500 chain_id,
1501 Some(first_metadata),
1502 Some(metadata),
1503 Some("first".to_string()),
1504 );
1505
1506 let second_metadata = second.metadata();
1508 add_edge_with_metadata(
1509 structure,
1510 second_id,
1511 chain_id,
1512 Some(second_metadata),
1513 Some(metadata),
1514 Some("second".to_string()),
1515 );
1516
1517 chain_id
1518 }
1519
1520 HydroNode::Counter {
1521 tag: _,
1522 prefix: _,
1523 duration,
1524 input,
1525 metadata,
1526 } => build_single_expr_transform(
1527 TransformParams {
1528 structure,
1529 seen_tees,
1530 config,
1531 input,
1532 metadata,
1533 op_name: extract_op_name(self.print_root()),
1534 node_type: HydroNodeType::Transform,
1535 },
1536 duration,
1537 ),
1538 }
1539 }
1540}
1541
1542macro_rules! render_hydro_ir {
1545 ($name:ident, $write_fn:ident) => {
1546 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1547 let mut output = String::new();
1548 $write_fn(&mut output, roots, config).unwrap();
1549 output
1550 }
1551 };
1552}
1553
1554macro_rules! write_hydro_ir {
1556 ($name:ident, $writer_type:ty, $constructor:expr) => {
1557 pub fn $name(
1558 output: impl std::fmt::Write,
1559 roots: &[HydroRoot],
1560 config: &HydroWriteConfig,
1561 ) -> std::fmt::Result {
1562 let mut graph_write: $writer_type = $constructor(output, config);
1563 write_hydro_ir_graph(&mut graph_write, roots, config)
1564 }
1565 };
1566}
1567
1568render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1569write_hydro_ir!(
1570 write_hydro_ir_mermaid,
1571 HydroMermaid<_>,
1572 HydroMermaid::new_with_config
1573);
1574
1575render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1576write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1577
1578render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1580
1581render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1583write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1584
1585fn write_hydro_ir_graph<W>(
1586 graph_write: W,
1587 roots: &[HydroRoot],
1588 config: &HydroWriteConfig,
1589) -> Result<(), W::Err>
1590where
1591 W: HydroGraphWrite,
1592{
1593 let mut structure = HydroGraphStructure::new();
1594 let mut seen_tees = HashMap::new();
1595
1596 for leaf in roots {
1598 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1599 }
1600
1601 write_graph_structure(&structure, graph_write, config)
1602}