1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::json::HydroJson;
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
11use crate::compile::ir::backtrace::Backtrace;
12use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
13use crate::location::dynamic::LocationId;
14
15#[derive(Debug, Clone)]
17pub enum NodeLabel {
18 Static(String),
20 WithExprs {
22 op_name: String,
23 exprs: Vec<DebugExpr>,
24 },
25}
26
27impl NodeLabel {
28 pub fn static_label(s: String) -> Self {
30 Self::Static(s)
31 }
32
33 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
35 Self::WithExprs { op_name, exprs }
36 }
37}
38
39impl std::fmt::Display for NodeLabel {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Self::Static(s) => write!(f, "{}", s),
43 Self::WithExprs { op_name, exprs } => {
44 if exprs.is_empty() {
45 write!(f, "{}()", op_name)
46 } else {
47 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
48 write!(f, "{}({})", op_name, expr_strs.join(", "))
49 }
50 }
51 }
52 }
53}
54
55pub struct IndentedGraphWriter<W> {
58 pub write: W,
59 pub indent: usize,
60 pub config: HydroWriteConfig,
61}
62
63impl<W> IndentedGraphWriter<W> {
64 pub fn new(write: W) -> Self {
66 Self {
67 write,
68 indent: 0,
69 config: HydroWriteConfig::default(),
70 }
71 }
72
73 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
75 Self {
76 write,
77 indent: 0,
78 config: config.clone(),
79 }
80 }
81}
82
83impl<W: Write> IndentedGraphWriter<W> {
84 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
86 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
87 }
88}
89
90pub type GraphWriteError = std::fmt::Error;
92
93#[auto_impl(&mut, Box)]
95pub trait HydroGraphWrite {
96 type Err: Error;
98
99 fn write_prologue(&mut self) -> Result<(), Self::Err>;
101
102 fn write_node_definition(
104 &mut self,
105 node_id: usize,
106 node_label: &NodeLabel,
107 node_type: HydroNodeType,
108 location_id: Option<usize>,
109 location_type: Option<&str>,
110 backtrace: Option<&Backtrace>,
111 ) -> Result<(), Self::Err>;
112
113 fn write_edge(
115 &mut self,
116 src_id: usize,
117 dst_id: usize,
118 edge_properties: &HashSet<HydroEdgeProp>,
119 label: Option<&str>,
120 ) -> Result<(), Self::Err>;
121
122 fn write_location_start(
124 &mut self,
125 location_id: usize,
126 location_type: &str,
127 ) -> Result<(), Self::Err>;
128
129 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
131
132 fn write_location_end(&mut self) -> Result<(), Self::Err>;
134
135 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
137}
138
139pub mod node_type_utils {
141 use super::HydroNodeType;
142
143 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
145 (HydroNodeType::Source, "Source"),
146 (HydroNodeType::Transform, "Transform"),
147 (HydroNodeType::Join, "Join"),
148 (HydroNodeType::Aggregation, "Aggregation"),
149 (HydroNodeType::Network, "Network"),
150 (HydroNodeType::Sink, "Sink"),
151 (HydroNodeType::Tee, "Tee"),
152 (HydroNodeType::NonDeterministic, "NonDeterministic"),
153 ];
154
155 pub fn to_string(node_type: HydroNodeType) -> &'static str {
157 NODE_TYPE_DATA
158 .iter()
159 .find(|(nt, _)| *nt == node_type)
160 .map(|(_, name)| *name)
161 .unwrap_or("Unknown")
162 }
163
164 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
166 NODE_TYPE_DATA.to_vec()
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum HydroNodeType {
173 Source,
174 Transform,
175 Join,
176 Aggregation,
177 Network,
178 Sink,
179 Tee,
180 NonDeterministic,
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
185pub enum HydroEdgeProp {
186 Bounded,
187 Unbounded,
188 TotalOrder,
189 NoOrder,
190 Keyed,
191 Stream,
193 KeyedSingleton,
194 KeyedStream,
195 Singleton,
196 Optional,
197 Network,
198 Cycle,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct UnifiedEdgeStyle {
205 pub line_pattern: LinePattern,
207 pub line_width: u8,
209 pub arrowhead: ArrowheadStyle,
211 pub line_style: LineStyle,
213 pub halo: HaloStyle,
215 pub waviness: WavinessStyle,
217 pub animation: AnimationStyle,
219 pub color: &'static str,
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub enum LinePattern {
225 Solid,
226 Dotted,
227 Dashed,
228}
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231pub enum ArrowheadStyle {
232 TriangleFilled,
233 CircleFilled,
234 DiamondOpen,
235 Default,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum LineStyle {
240 Single,
242 HashMarks,
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum HaloStyle {
248 None,
249 LightBlue,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
253pub enum WavinessStyle {
254 None,
255 Wavy,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum AnimationStyle {
260 Static,
261 Animated,
262}
263
264impl Default for UnifiedEdgeStyle {
265 fn default() -> Self {
266 Self {
267 line_pattern: LinePattern::Solid,
268 line_width: 1,
269 arrowhead: ArrowheadStyle::Default,
270 line_style: LineStyle::Single,
271 halo: HaloStyle::None,
272 waviness: WavinessStyle::None,
273 animation: AnimationStyle::Static,
274 color: "#666666",
275 }
276 }
277}
278
279pub fn get_unified_edge_style(
292 edge_properties: &HashSet<HydroEdgeProp>,
293 src_location: Option<usize>,
294 dst_location: Option<usize>,
295) -> UnifiedEdgeStyle {
296 let mut style = UnifiedEdgeStyle::default();
297
298 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
300 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
301
302 if is_network {
303 style.line_pattern = LinePattern::Dashed;
304 style.animation = AnimationStyle::Animated;
305 } else {
306 style.line_pattern = LinePattern::Solid;
307 style.animation = AnimationStyle::Static;
308 }
309
310 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
312 style.halo = HaloStyle::LightBlue;
313 } else {
314 style.halo = HaloStyle::None;
315 }
316
317 if edge_properties.contains(&HydroEdgeProp::Stream) {
319 style.arrowhead = ArrowheadStyle::TriangleFilled;
320 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
322 style.arrowhead = ArrowheadStyle::TriangleFilled;
323 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
325 style.arrowhead = ArrowheadStyle::TriangleFilled;
326 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
328 style.arrowhead = ArrowheadStyle::CircleFilled;
329 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
331 style.arrowhead = ArrowheadStyle::DiamondOpen;
332 style.color = "#6b7280"; }
334
335 if edge_properties.contains(&HydroEdgeProp::Keyed) {
337 style.line_style = LineStyle::HashMarks; } else {
339 style.line_style = LineStyle::Single;
340 }
341
342 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
344 style.waviness = WavinessStyle::Wavy;
345 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
346 style.waviness = WavinessStyle::None;
347 }
348
349 style
350}
351
352pub fn extract_edge_properties_from_collection_kind(
356 collection_kind: &crate::compile::ir::CollectionKind,
357) -> HashSet<HydroEdgeProp> {
358 use crate::compile::ir::CollectionKind;
359
360 let mut properties = HashSet::new();
361
362 match collection_kind {
363 CollectionKind::Stream { bound, order, .. } => {
364 properties.insert(HydroEdgeProp::Stream);
365 add_bound_property(&mut properties, bound);
366 add_order_property(&mut properties, order);
367 }
368 CollectionKind::KeyedStream {
369 bound, value_order, ..
370 } => {
371 properties.insert(HydroEdgeProp::KeyedStream);
372 properties.insert(HydroEdgeProp::Keyed);
373 add_bound_property(&mut properties, bound);
374 add_order_property(&mut properties, value_order);
375 }
376 CollectionKind::Singleton { bound, .. } => {
377 properties.insert(HydroEdgeProp::Singleton);
378 add_bound_property(&mut properties, bound);
379 properties.insert(HydroEdgeProp::TotalOrder);
381 }
382 CollectionKind::Optional { bound, .. } => {
383 properties.insert(HydroEdgeProp::Optional);
384 add_bound_property(&mut properties, bound);
385 properties.insert(HydroEdgeProp::TotalOrder);
387 }
388 CollectionKind::KeyedSingleton { bound, .. } => {
389 properties.insert(HydroEdgeProp::Singleton);
390 properties.insert(HydroEdgeProp::Keyed);
391 add_keyed_singleton_bound_property(&mut properties, bound);
393 properties.insert(HydroEdgeProp::TotalOrder);
394 }
395 }
396
397 properties
398}
399
400fn add_bound_property(
402 properties: &mut HashSet<HydroEdgeProp>,
403 bound: &crate::compile::ir::BoundKind,
404) {
405 use crate::compile::ir::BoundKind;
406
407 match bound {
408 BoundKind::Bounded => {
409 properties.insert(HydroEdgeProp::Bounded);
410 }
411 BoundKind::Unbounded => {
412 properties.insert(HydroEdgeProp::Unbounded);
413 }
414 }
415}
416
417fn add_keyed_singleton_bound_property(
419 properties: &mut HashSet<HydroEdgeProp>,
420 bound: &crate::compile::ir::KeyedSingletonBoundKind,
421) {
422 use crate::compile::ir::KeyedSingletonBoundKind;
423
424 match bound {
425 KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
426 properties.insert(HydroEdgeProp::Bounded);
427 }
428 KeyedSingletonBoundKind::Unbounded => {
429 properties.insert(HydroEdgeProp::Unbounded);
430 }
431 }
432}
433
434fn add_order_property(
436 properties: &mut HashSet<HydroEdgeProp>,
437 order: &crate::compile::ir::StreamOrder,
438) {
439 use crate::compile::ir::StreamOrder;
440
441 match order {
442 StreamOrder::TotalOrder => {
443 properties.insert(HydroEdgeProp::TotalOrder);
444 }
445 StreamOrder::NoOrder => {
446 properties.insert(HydroEdgeProp::NoOrder);
447 }
448 }
449}
450
451pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
454 src_location.root() != dst_location.root()
456}
457
458pub fn add_network_edge_tag(
460 properties: &mut HashSet<HydroEdgeProp>,
461 src_location: &LocationId,
462 dst_location: &LocationId,
463) {
464 if is_network_edge(src_location, dst_location) {
465 properties.insert(HydroEdgeProp::Network);
466 }
467}
468
469#[derive(Debug, Clone)]
471pub struct HydroWriteConfig {
472 pub show_metadata: bool,
473 pub show_location_groups: bool,
474 pub use_short_labels: bool,
475 pub process_id_name: Vec<(usize, String)>,
476 pub cluster_id_name: Vec<(usize, String)>,
477 pub external_id_name: Vec<(usize, String)>,
478}
479
480impl Default for HydroWriteConfig {
481 fn default() -> Self {
482 Self {
483 show_metadata: false,
484 show_location_groups: true,
485 use_short_labels: true, process_id_name: vec![],
487 cluster_id_name: vec![],
488 external_id_name: vec![],
489 }
490 }
491}
492
493#[derive(Clone)]
495pub struct HydroGraphNode {
496 pub label: NodeLabel,
497 pub node_type: HydroNodeType,
498 pub location: Option<usize>,
499 pub backtrace: Option<Backtrace>,
500}
501
502#[derive(Debug, Clone)]
504pub struct HydroGraphEdge {
505 pub src: usize,
506 pub dst: usize,
507 pub edge_properties: HashSet<HydroEdgeProp>,
508 pub label: Option<String>,
509}
510
511#[derive(Default)]
513pub struct HydroGraphStructure {
514 pub nodes: HashMap<usize, HydroGraphNode>,
515 pub edges: Vec<HydroGraphEdge>,
516 pub locations: HashMap<usize, String>, pub next_node_id: usize,
518}
519
520impl HydroGraphStructure {
521 pub fn new() -> Self {
522 Self::default()
523 }
524
525 pub fn add_node(
526 &mut self,
527 label: NodeLabel,
528 node_type: HydroNodeType,
529 location: Option<usize>,
530 ) -> usize {
531 self.add_node_with_backtrace(label, node_type, location, None)
532 }
533
534 pub fn add_node_with_backtrace(
535 &mut self,
536 label: NodeLabel,
537 node_type: HydroNodeType,
538 location: Option<usize>,
539 backtrace: Option<Backtrace>,
540 ) -> usize {
541 let node_id = self.next_node_id;
542 self.next_node_id += 1;
543 self.nodes.insert(
544 node_id,
545 HydroGraphNode {
546 label,
547 node_type,
548 location,
549 backtrace,
550 },
551 );
552 node_id
553 }
554
555 pub fn add_node_with_metadata(
557 &mut self,
558 label: NodeLabel,
559 node_type: HydroNodeType,
560 metadata: &HydroIrMetadata,
561 ) -> usize {
562 let location = setup_location(self, metadata);
563 let backtrace = Some(metadata.op.backtrace.clone());
564 self.add_node_with_backtrace(label, node_type, location, backtrace)
565 }
566
567 pub fn add_edge(
568 &mut self,
569 src: usize,
570 dst: usize,
571 edge_properties: HashSet<HydroEdgeProp>,
572 label: Option<String>,
573 ) {
574 self.edges.push(HydroGraphEdge {
575 src,
576 dst,
577 edge_properties,
578 label,
579 });
580 }
581
582 pub fn add_edge_single(
584 &mut self,
585 src: usize,
586 dst: usize,
587 edge_type: HydroEdgeProp,
588 label: Option<String>,
589 ) {
590 let mut properties = HashSet::new();
591 properties.insert(edge_type);
592 self.edges.push(HydroGraphEdge {
593 src,
594 dst,
595 edge_properties: properties,
596 label,
597 });
598 }
599
600 pub fn add_location(&mut self, location_id: usize, location_type: String) {
601 self.locations.insert(location_id, location_type);
602 }
603}
604
605pub fn extract_op_name(full_label: String) -> String {
607 full_label
608 .split('(')
609 .next()
610 .unwrap_or("unknown")
611 .to_string()
612 .to_lowercase()
613}
614
615pub fn extract_short_label(full_label: &str) -> String {
617 if let Some(op_name) = full_label.split('(').next() {
619 let base_name = op_name.to_lowercase();
620 match base_name.as_str() {
621 "source" => {
623 if full_label.contains("Iter") {
624 "source_iter".to_string()
625 } else if full_label.contains("Stream") {
626 "source_stream".to_string()
627 } else if full_label.contains("ExternalNetwork") {
628 "external_network".to_string()
629 } else if full_label.contains("Spin") {
630 "spin".to_string()
631 } else {
632 "source".to_string()
633 }
634 }
635 "network" => {
636 if full_label.contains("deser") {
637 "network(recv)".to_string()
638 } else if full_label.contains("ser") {
639 "network(send)".to_string()
640 } else {
641 "network".to_string()
642 }
643 }
644 _ => base_name,
646 }
647 } else {
648 if full_label.len() > 20 {
650 format!("{}...", &full_label[..17])
651 } else {
652 full_label.to_string()
653 }
654 }
655}
656
657fn extract_location_id(location_id: &LocationId) -> (Option<usize>, Option<String>) {
659 match location_id.root() {
660 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
661 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
662 _ => panic!("unexpected location type"),
663 }
664}
665
666fn setup_location(
668 structure: &mut HydroGraphStructure,
669 metadata: &HydroIrMetadata,
670) -> Option<usize> {
671 let (location_id, location_type) = extract_location_id(&metadata.location_kind);
672 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
673 structure.add_location(loc_id, loc_type);
674 }
675 location_id
676}
677
678fn add_edge_with_metadata(
681 structure: &mut HydroGraphStructure,
682 src_id: usize,
683 dst_id: usize,
684 src_metadata: Option<&HydroIrMetadata>,
685 dst_metadata: Option<&HydroIrMetadata>,
686 label: Option<String>,
687) {
688 let mut properties = HashSet::new();
689
690 if let Some(metadata) = src_metadata {
692 properties.extend(extract_edge_properties_from_collection_kind(
693 &metadata.collection_kind,
694 ));
695 }
696
697 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
699 add_network_edge_tag(
700 &mut properties,
701 &src_meta.location_kind,
702 &dst_meta.location_kind,
703 );
704 }
705
706 if properties.is_empty() {
708 properties.insert(HydroEdgeProp::Stream);
709 }
710
711 structure.add_edge(src_id, dst_id, properties, label);
712}
713
714fn write_graph_structure<W>(
716 structure: &HydroGraphStructure,
717 graph_write: W,
718 config: &HydroWriteConfig,
719) -> Result<(), W::Err>
720where
721 W: HydroGraphWrite,
722{
723 let mut graph_write = graph_write;
724 graph_write.write_prologue()?;
726
727 for (&node_id, node) in &structure.nodes {
729 let (location_id, location_type) = if let Some(loc_id) = node.location {
730 (
731 Some(loc_id),
732 structure.locations.get(&loc_id).map(|s| s.as_str()),
733 )
734 } else {
735 (None, None)
736 };
737
738 graph_write.write_node_definition(
739 node_id,
740 &node.label,
741 node.node_type,
742 location_id,
743 location_type,
744 node.backtrace.as_ref(),
745 )?;
746 }
747
748 if config.show_location_groups {
750 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
751 for (&node_id, node) in &structure.nodes {
752 if let Some(location_id) = node.location {
753 nodes_by_location
754 .entry(location_id)
755 .or_default()
756 .push(node_id);
757 }
758 }
759
760 for (&location_id, node_ids) in &nodes_by_location {
761 if let Some(location_type) = structure.locations.get(&location_id) {
762 graph_write.write_location_start(location_id, location_type)?;
763 for &node_id in node_ids {
764 graph_write.write_node(node_id)?;
765 }
766 graph_write.write_location_end()?;
767 }
768 }
769 }
770
771 for edge in &structure.edges {
773 graph_write.write_edge(
774 edge.src,
775 edge.dst,
776 &edge.edge_properties,
777 edge.label.as_deref(),
778 )?;
779 }
780
781 graph_write.write_epilogue()?;
782 Ok(())
783}
784
785impl HydroRoot {
786 pub fn build_graph_structure(
788 &self,
789 structure: &mut HydroGraphStructure,
790 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
791 config: &HydroWriteConfig,
792 ) -> usize {
793 fn build_sink_node(
795 structure: &mut HydroGraphStructure,
796 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
797 config: &HydroWriteConfig,
798 input: &HydroNode,
799 sink_metadata: Option<&HydroIrMetadata>,
800 label: NodeLabel,
801 ) -> usize {
802 let input_id = input.build_graph_structure(structure, seen_tees, config);
803
804 let effective_metadata = if let Some(meta) = sink_metadata {
806 Some(meta)
807 } else {
808 match input {
809 HydroNode::Placeholder => None,
810 _ => Some(input.metadata()),
812 }
813 };
814
815 let location_id = effective_metadata.and_then(|m| setup_location(structure, m));
816 let sink_id = structure.add_node_with_backtrace(
817 label,
818 HydroNodeType::Sink,
819 location_id,
820 effective_metadata.map(|m| m.op.backtrace.clone()),
821 );
822
823 let input_metadata = input.metadata();
825 add_edge_with_metadata(
826 structure,
827 input_id,
828 sink_id,
829 Some(input_metadata),
830 sink_metadata,
831 None,
832 );
833
834 sink_id
835 }
836
837 match self {
838 HydroRoot::ForEach { f, input, .. } => build_sink_node(
840 structure,
841 seen_tees,
842 config,
843 input,
844 None,
845 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
846 ),
847
848 HydroRoot::SendExternal {
849 to_external_id,
850 to_key,
851 input,
852 ..
853 } => build_sink_node(
854 structure,
855 seen_tees,
856 config,
857 input,
858 None,
859 NodeLabel::with_exprs(
860 format!("send_external({}:{})", to_external_id, to_key),
861 vec![],
862 ),
863 ),
864
865 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
866 structure,
867 seen_tees,
868 config,
869 input,
870 None,
871 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
872 ),
873
874 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
875 structure,
876 seen_tees,
877 config,
878 input,
879 None,
880 NodeLabel::static_label(format!("cycle_sink({})", ident)),
881 ),
882 }
883 }
884}
885
886impl HydroNode {
887 pub fn build_graph_structure(
889 &self,
890 structure: &mut HydroGraphStructure,
891 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
892 config: &HydroWriteConfig,
893 ) -> usize {
894 use crate::location::dynamic::LocationId;
895
896 struct TransformParams<'a> {
900 structure: &'a mut HydroGraphStructure,
901 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
902 config: &'a HydroWriteConfig,
903 input: &'a HydroNode,
904 metadata: &'a HydroIrMetadata,
905 op_name: String,
906 node_type: HydroNodeType,
907 }
908
909 fn build_simple_transform(params: TransformParams) -> usize {
911 let input_id = params.input.build_graph_structure(
912 params.structure,
913 params.seen_tees,
914 params.config,
915 );
916 let node_id = params.structure.add_node_with_metadata(
917 NodeLabel::Static(params.op_name.to_string()),
918 params.node_type,
919 params.metadata,
920 );
921
922 let input_metadata = params.input.metadata();
924 add_edge_with_metadata(
925 params.structure,
926 input_id,
927 node_id,
928 Some(input_metadata),
929 Some(params.metadata),
930 None,
931 );
932
933 node_id
934 }
935
936 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
938 let input_id = params.input.build_graph_structure(
939 params.structure,
940 params.seen_tees,
941 params.config,
942 );
943 let node_id = params.structure.add_node_with_metadata(
944 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
945 params.node_type,
946 params.metadata,
947 );
948
949 let input_metadata = params.input.metadata();
951 add_edge_with_metadata(
952 params.structure,
953 input_id,
954 node_id,
955 Some(input_metadata),
956 Some(params.metadata),
957 None,
958 );
959
960 node_id
961 }
962
963 fn build_dual_expr_transform(
965 params: TransformParams,
966 expr1: &DebugExpr,
967 expr2: &DebugExpr,
968 ) -> usize {
969 let input_id = params.input.build_graph_structure(
970 params.structure,
971 params.seen_tees,
972 params.config,
973 );
974 let node_id = params.structure.add_node_with_metadata(
975 NodeLabel::with_exprs(
976 params.op_name.to_string(),
977 vec![expr1.clone(), expr2.clone()],
978 ),
979 params.node_type,
980 params.metadata,
981 );
982
983 let input_metadata = params.input.metadata();
985 add_edge_with_metadata(
986 params.structure,
987 input_id,
988 node_id,
989 Some(input_metadata),
990 Some(params.metadata),
991 None,
992 );
993
994 node_id
995 }
996
997 fn build_source_node(
999 structure: &mut HydroGraphStructure,
1000 metadata: &HydroIrMetadata,
1001 label: String,
1002 ) -> usize {
1003 structure.add_node_with_metadata(
1004 NodeLabel::Static(label),
1005 HydroNodeType::Source,
1006 metadata,
1007 )
1008 }
1009
1010 match self {
1011 HydroNode::Placeholder => structure.add_node(
1012 NodeLabel::Static("PLACEHOLDER".to_string()),
1013 HydroNodeType::Transform,
1014 None,
1015 ),
1016
1017 HydroNode::Source {
1018 source, metadata, ..
1019 } => {
1020 let label = match source {
1021 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1022 HydroSource::ExternalNetwork() => "external_network()".to_string(),
1023 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1024 HydroSource::Spin() => "spin()".to_string(),
1025 HydroSource::ClusterMembers(location_id) => {
1026 format!(
1027 "source_stream(cluster_membership_stream({:?}))",
1028 location_id
1029 )
1030 }
1031 };
1032 build_source_node(structure, metadata, label)
1033 }
1034
1035 HydroNode::SingletonSource { value, metadata } => {
1036 let label = format!("singleton({})", value);
1037 build_source_node(structure, metadata, label)
1038 }
1039
1040 HydroNode::ExternalInput {
1041 from_external_id,
1042 from_key,
1043 metadata,
1044 ..
1045 } => build_source_node(
1046 structure,
1047 metadata,
1048 format!("external_input({}:{})", from_external_id, from_key),
1049 ),
1050
1051 HydroNode::CycleSource {
1052 ident, metadata, ..
1053 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1054
1055 HydroNode::Tee { inner, metadata } => {
1056 let ptr = inner.as_ptr();
1057 if let Some(&existing_id) = seen_tees.get(&ptr) {
1058 return existing_id;
1059 }
1060
1061 let input_id = inner
1062 .0
1063 .borrow()
1064 .build_graph_structure(structure, seen_tees, config);
1065 let tee_id = structure.add_node_with_metadata(
1066 NodeLabel::Static(extract_op_name(self.print_root())),
1067 HydroNodeType::Tee,
1068 metadata,
1069 );
1070
1071 seen_tees.insert(ptr, tee_id);
1072
1073 let inner_borrow = inner.0.borrow();
1075 let input_metadata = inner_borrow.metadata();
1076 add_edge_with_metadata(
1077 structure,
1078 input_id,
1079 tee_id,
1080 Some(input_metadata),
1081 Some(metadata),
1082 None,
1083 );
1084 drop(inner_borrow);
1085
1086 tee_id
1087 }
1088
1089 HydroNode::ObserveNonDet {
1091 inner, metadata, ..
1092 } => build_simple_transform(TransformParams {
1093 structure,
1094 seen_tees,
1095 config,
1096 input: inner,
1097 metadata,
1098 op_name: extract_op_name(self.print_root()),
1099 node_type: HydroNodeType::NonDeterministic,
1100 }),
1101
1102 HydroNode::Cast { inner, metadata }
1104 | HydroNode::DeferTick {
1105 input: inner,
1106 metadata,
1107 }
1108 | HydroNode::Enumerate {
1109 input: inner,
1110 metadata,
1111 ..
1112 }
1113 | HydroNode::Unique {
1114 input: inner,
1115 metadata,
1116 }
1117 | HydroNode::ResolveFutures {
1118 input: inner,
1119 metadata,
1120 }
1121 | HydroNode::ResolveFuturesOrdered {
1122 input: inner,
1123 metadata,
1124 } => build_simple_transform(TransformParams {
1125 structure,
1126 seen_tees,
1127 config,
1128 input: inner,
1129 metadata,
1130 op_name: extract_op_name(self.print_root()),
1131 node_type: HydroNodeType::Transform,
1132 }),
1133
1134 HydroNode::Sort {
1136 input: inner,
1137 metadata,
1138 } => build_simple_transform(TransformParams {
1139 structure,
1140 seen_tees,
1141 config,
1142 input: inner,
1143 metadata,
1144 op_name: extract_op_name(self.print_root()),
1145 node_type: HydroNodeType::Aggregation,
1146 }),
1147
1148 HydroNode::Map { f, input, metadata }
1150 | HydroNode::Filter { f, input, metadata }
1151 | HydroNode::FlatMap { f, input, metadata }
1152 | HydroNode::FilterMap { f, input, metadata }
1153 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1154 TransformParams {
1155 structure,
1156 seen_tees,
1157 config,
1158 input,
1159 metadata,
1160 op_name: extract_op_name(self.print_root()),
1161 node_type: HydroNodeType::Transform,
1162 },
1163 f,
1164 ),
1165
1166 HydroNode::Reduce { f, input, metadata }
1168 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1169 TransformParams {
1170 structure,
1171 seen_tees,
1172 config,
1173 input,
1174 metadata,
1175 op_name: extract_op_name(self.print_root()),
1176 node_type: HydroNodeType::Aggregation,
1177 },
1178 f,
1179 ),
1180
1181 HydroNode::Join {
1183 left,
1184 right,
1185 metadata,
1186 }
1187 | HydroNode::CrossProduct {
1188 left,
1189 right,
1190 metadata,
1191 }
1192 | HydroNode::CrossSingleton {
1193 left,
1194 right,
1195 metadata,
1196 } => {
1197 let left_id = left.build_graph_structure(structure, seen_tees, config);
1198 let right_id = right.build_graph_structure(structure, seen_tees, config);
1199 let node_id = structure.add_node_with_metadata(
1200 NodeLabel::Static(extract_op_name(self.print_root())),
1201 HydroNodeType::Join,
1202 metadata,
1203 );
1204
1205 let left_metadata = left.metadata();
1207 add_edge_with_metadata(
1208 structure,
1209 left_id,
1210 node_id,
1211 Some(left_metadata),
1212 Some(metadata),
1213 Some("left".to_string()),
1214 );
1215
1216 let right_metadata = right.metadata();
1218 add_edge_with_metadata(
1219 structure,
1220 right_id,
1221 node_id,
1222 Some(right_metadata),
1223 Some(metadata),
1224 Some("right".to_string()),
1225 );
1226
1227 node_id
1228 }
1229
1230 HydroNode::Difference {
1232 pos: left,
1233 neg: right,
1234 metadata,
1235 }
1236 | HydroNode::AntiJoin {
1237 pos: left,
1238 neg: right,
1239 metadata,
1240 } => {
1241 let left_id = left.build_graph_structure(structure, seen_tees, config);
1242 let right_id = right.build_graph_structure(structure, seen_tees, config);
1243 let node_id = structure.add_node_with_metadata(
1244 NodeLabel::Static(extract_op_name(self.print_root())),
1245 HydroNodeType::Join,
1246 metadata,
1247 );
1248
1249 let left_metadata = left.metadata();
1251 add_edge_with_metadata(
1252 structure,
1253 left_id,
1254 node_id,
1255 Some(left_metadata),
1256 Some(metadata),
1257 Some("pos".to_string()),
1258 );
1259
1260 let right_metadata = right.metadata();
1262 add_edge_with_metadata(
1263 structure,
1264 right_id,
1265 node_id,
1266 Some(right_metadata),
1267 Some(metadata),
1268 Some("neg".to_string()),
1269 );
1270
1271 node_id
1272 }
1273
1274 HydroNode::Fold {
1276 init,
1277 acc,
1278 input,
1279 metadata,
1280 }
1281 | HydroNode::FoldKeyed {
1282 init,
1283 acc,
1284 input,
1285 metadata,
1286 }
1287 | HydroNode::Scan {
1288 init,
1289 acc,
1290 input,
1291 metadata,
1292 } => {
1293 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1296 TransformParams {
1297 structure,
1298 seen_tees,
1299 config,
1300 input,
1301 metadata,
1302 op_name: extract_op_name(self.print_root()),
1303 node_type,
1304 },
1305 init,
1306 acc,
1307 )
1308 }
1309
1310 HydroNode::ReduceKeyedWatermark {
1312 f,
1313 input,
1314 watermark,
1315 metadata,
1316 } => {
1317 let input_id = input.build_graph_structure(structure, seen_tees, config);
1318 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1319 let location_id = setup_location(structure, metadata);
1320 let join_node_id = structure.add_node_with_backtrace(
1321 NodeLabel::Static(extract_op_name(self.print_root())),
1322 HydroNodeType::Join,
1323 location_id,
1324 Some(metadata.op.backtrace.clone()),
1325 );
1326
1327 let input_metadata = input.metadata();
1329 add_edge_with_metadata(
1330 structure,
1331 input_id,
1332 join_node_id,
1333 Some(input_metadata),
1334 Some(metadata),
1335 Some("input".to_string()),
1336 );
1337
1338 let watermark_metadata = watermark.metadata();
1340 add_edge_with_metadata(
1341 structure,
1342 watermark_id,
1343 join_node_id,
1344 Some(watermark_metadata),
1345 Some(metadata),
1346 Some("watermark".to_string()),
1347 );
1348
1349 let node_id = structure.add_node_with_backtrace(
1350 NodeLabel::with_exprs(
1351 extract_op_name(self.print_root()).to_string(),
1352 vec![f.clone()],
1353 ),
1354 HydroNodeType::Aggregation,
1355 location_id,
1356 Some(metadata.op.backtrace.clone()),
1357 );
1358
1359 let join_metadata = metadata; add_edge_with_metadata(
1362 structure,
1363 join_node_id,
1364 node_id,
1365 Some(join_metadata),
1366 Some(metadata),
1367 None,
1368 );
1369
1370 node_id
1371 }
1372
1373 HydroNode::Network {
1374 serialize_fn,
1375 deserialize_fn,
1376 input,
1377 metadata,
1378 ..
1379 } => {
1380 let input_id = input.build_graph_structure(structure, seen_tees, config);
1381 let _from_location_id = setup_location(structure, metadata);
1382
1383 let to_location_id = match metadata.location_kind.root() {
1384 LocationId::Process(id) => {
1385 structure.add_location(*id, "Process".to_string());
1386 Some(*id)
1387 }
1388 LocationId::Cluster(id) => {
1389 structure.add_location(*id, "Cluster".to_string());
1390 Some(*id)
1391 }
1392 _ => None,
1393 };
1394
1395 let mut label = "network(".to_string();
1396 if serialize_fn.is_some() {
1397 label.push_str("send");
1398 }
1399 if deserialize_fn.is_some() {
1400 if serialize_fn.is_some() {
1401 label.push_str(" + ");
1402 }
1403 label.push_str("recv");
1404 }
1405 label.push(')');
1406
1407 let network_id = structure.add_node_with_backtrace(
1408 NodeLabel::Static(label),
1409 HydroNodeType::Network,
1410 to_location_id,
1411 Some(metadata.op.backtrace.clone()),
1412 );
1413
1414 let input_metadata = input.metadata();
1416 add_edge_with_metadata(
1417 structure,
1418 input_id,
1419 network_id,
1420 Some(input_metadata),
1421 Some(metadata),
1422 Some(format!("to {:?}", to_location_id)),
1423 );
1424
1425 network_id
1426 }
1427
1428 HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1430 structure,
1431 seen_tees,
1432 config,
1433 input: inner,
1434 metadata,
1435 op_name: extract_op_name(self.print_root()),
1436 node_type: HydroNodeType::NonDeterministic,
1437 }),
1438
1439 HydroNode::YieldConcat { inner, .. } => {
1440 inner.build_graph_structure(structure, seen_tees, config)
1442 }
1443
1444 HydroNode::BeginAtomic { inner, .. } => {
1445 inner.build_graph_structure(structure, seen_tees, config)
1446 }
1447
1448 HydroNode::EndAtomic { inner, .. } => {
1449 inner.build_graph_structure(structure, seen_tees, config)
1450 }
1451
1452 HydroNode::Chain {
1453 first,
1454 second,
1455 metadata,
1456 } => {
1457 let first_id = first.build_graph_structure(structure, seen_tees, config);
1458 let second_id = second.build_graph_structure(structure, seen_tees, config);
1459 let location_id = setup_location(structure, metadata);
1460 let chain_id = structure.add_node_with_backtrace(
1461 NodeLabel::Static(extract_op_name(self.print_root())),
1462 HydroNodeType::Transform,
1463 location_id,
1464 Some(metadata.op.backtrace.clone()),
1465 );
1466
1467 let first_metadata = first.metadata();
1469 add_edge_with_metadata(
1470 structure,
1471 first_id,
1472 chain_id,
1473 Some(first_metadata),
1474 Some(metadata),
1475 Some("first".to_string()),
1476 );
1477
1478 let second_metadata = second.metadata();
1480 add_edge_with_metadata(
1481 structure,
1482 second_id,
1483 chain_id,
1484 Some(second_metadata),
1485 Some(metadata),
1486 Some("second".to_string()),
1487 );
1488
1489 chain_id
1490 }
1491
1492 HydroNode::ChainFirst {
1493 first,
1494 second,
1495 metadata,
1496 } => {
1497 let first_id = first.build_graph_structure(structure, seen_tees, config);
1498 let second_id = second.build_graph_structure(structure, seen_tees, config);
1499 let location_id = setup_location(structure, metadata);
1500 let chain_id = structure.add_node_with_backtrace(
1501 NodeLabel::Static(extract_op_name(self.print_root())),
1502 HydroNodeType::Transform,
1503 location_id,
1504 Some(metadata.op.backtrace.clone()),
1505 );
1506
1507 let first_metadata = first.metadata();
1509 add_edge_with_metadata(
1510 structure,
1511 first_id,
1512 chain_id,
1513 Some(first_metadata),
1514 Some(metadata),
1515 Some("first".to_string()),
1516 );
1517
1518 let second_metadata = second.metadata();
1520 add_edge_with_metadata(
1521 structure,
1522 second_id,
1523 chain_id,
1524 Some(second_metadata),
1525 Some(metadata),
1526 Some("second".to_string()),
1527 );
1528
1529 chain_id
1530 }
1531
1532 HydroNode::Counter {
1533 tag: _,
1534 prefix: _,
1535 duration,
1536 input,
1537 metadata,
1538 } => build_single_expr_transform(
1539 TransformParams {
1540 structure,
1541 seen_tees,
1542 config,
1543 input,
1544 metadata,
1545 op_name: extract_op_name(self.print_root()),
1546 node_type: HydroNodeType::Transform,
1547 },
1548 duration,
1549 ),
1550 }
1551 }
1552}
1553
1554macro_rules! render_hydro_ir {
1557 ($name:ident, $write_fn:ident) => {
1558 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
1559 let mut output = String::new();
1560 $write_fn(&mut output, roots, config).unwrap();
1561 output
1562 }
1563 };
1564}
1565
1566macro_rules! write_hydro_ir {
1568 ($name:ident, $writer_type:ty, $constructor:expr) => {
1569 pub fn $name(
1570 output: impl std::fmt::Write,
1571 roots: &[HydroRoot],
1572 config: &HydroWriteConfig,
1573 ) -> std::fmt::Result {
1574 let mut graph_write: $writer_type = $constructor(output, config);
1575 write_hydro_ir_graph(&mut graph_write, roots, config)
1576 }
1577 };
1578}
1579
1580render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1581write_hydro_ir!(
1582 write_hydro_ir_mermaid,
1583 HydroMermaid<_>,
1584 HydroMermaid::new_with_config
1585);
1586
1587render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1588write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1589
1590render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1592
1593render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1595write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1596
1597fn write_hydro_ir_graph<W>(
1598 graph_write: W,
1599 roots: &[HydroRoot],
1600 config: &HydroWriteConfig,
1601) -> Result<(), W::Err>
1602where
1603 W: HydroGraphWrite,
1604{
1605 let mut structure = HydroGraphStructure::new();
1606 let mut seen_tees = HashMap::new();
1607
1608 for leaf in roots {
1610 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1611 }
1612
1613 write_graph_structure(&structure, graph_write, config)
1614}