1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19#[derive(Debug, Clone)]
21pub enum NodeLabel {
22 Static(String),
24 WithExprs {
26 op_name: String,
27 exprs: Vec<DebugExpr>,
28 },
29}
30
31impl NodeLabel {
32 pub fn static_label(s: String) -> Self {
34 Self::Static(s)
35 }
36
37 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39 Self::WithExprs { op_name, exprs }
40 }
41}
42
43impl Display for NodeLabel {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::Static(s) => write!(f, "{}", s),
47 Self::WithExprs { op_name, exprs } => {
48 if exprs.is_empty() {
49 write!(f, "{}()", op_name)
50 } else {
51 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52 write!(f, "{}({})", op_name, expr_strs.join(", "))
53 }
54 }
55 }
56 }
57}
58
59pub struct IndentedGraphWriter<'a, W> {
62 pub write: W,
63 pub indent: usize,
64 pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68 pub fn new(write: W) -> Self {
70 Self {
71 write,
72 indent: 0,
73 config: HydroWriteConfig::default(),
74 }
75 }
76
77 pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79 Self {
80 write,
81 indent: 0,
82 config,
83 }
84 }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91 }
92}
93
94pub type GraphWriteError = std::fmt::Error;
96
97#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100 type Err: Error;
102
103 fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106 fn write_node_definition(
108 &mut self,
109 node_id: VizNodeKey,
110 node_label: &NodeLabel,
111 node_type: HydroNodeType,
112 location_key: Option<LocationKey>,
113 location_type: Option<LocationType>,
114 backtrace: Option<&Backtrace>,
115 ) -> Result<(), Self::Err>;
116
117 fn write_edge(
119 &mut self,
120 src_id: VizNodeKey,
121 dst_id: VizNodeKey,
122 edge_properties: &HashSet<HydroEdgeProp>,
123 label: Option<&str>,
124 ) -> Result<(), Self::Err>;
125
126 fn write_location_start(
128 &mut self,
129 location_key: LocationKey,
130 location_type: LocationType,
131 ) -> Result<(), Self::Err>;
132
133 fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136 fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143pub mod node_type_utils {
145 use super::HydroNodeType;
146
147 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149 (HydroNodeType::Source, "Source"),
150 (HydroNodeType::Transform, "Transform"),
151 (HydroNodeType::Join, "Join"),
152 (HydroNodeType::Aggregation, "Aggregation"),
153 (HydroNodeType::Network, "Network"),
154 (HydroNodeType::Sink, "Sink"),
155 (HydroNodeType::Tee, "Tee"),
156 (HydroNodeType::NonDeterministic, "NonDeterministic"),
157 ];
158
159 pub fn to_string(node_type: HydroNodeType) -> &'static str {
161 NODE_TYPE_DATA
162 .iter()
163 .find(|(nt, _)| *nt == node_type)
164 .map(|(_, name)| *name)
165 .unwrap_or("Unknown")
166 }
167
168 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170 NODE_TYPE_DATA.to_vec()
171 }
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177 Source,
178 Transform,
179 Join,
180 Aggregation,
181 Network,
182 Sink,
183 Tee,
184 NonDeterministic,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190 Bounded,
191 Unbounded,
192 TotalOrder,
193 NoOrder,
194 Keyed,
195 Stream,
197 KeyedSingleton,
198 KeyedStream,
199 Singleton,
200 Optional,
201 Network,
202 Cycle,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209 pub line_pattern: LinePattern,
211 pub line_width: u8,
213 pub arrowhead: ArrowheadStyle,
215 pub line_style: LineStyle,
217 pub halo: HaloStyle,
219 pub waviness: WavinessStyle,
221 pub animation: AnimationStyle,
223 pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229 Solid,
230 Dotted,
231 Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236 TriangleFilled,
237 CircleFilled,
238 DiamondOpen,
239 Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244 Single,
246 HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252 None,
253 LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258 None,
259 Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264 Static,
265 Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269 fn default() -> Self {
270 Self {
271 line_pattern: LinePattern::Solid,
272 line_width: 1,
273 arrowhead: ArrowheadStyle::Default,
274 line_style: LineStyle::Single,
275 halo: HaloStyle::None,
276 waviness: WavinessStyle::None,
277 animation: AnimationStyle::Static,
278 color: "#666666",
279 }
280 }
281}
282
283pub fn get_unified_edge_style(
296 edge_properties: &HashSet<HydroEdgeProp>,
297 src_location: Option<usize>,
298 dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300 let mut style = UnifiedEdgeStyle::default();
301
302 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306 if is_network {
307 style.line_pattern = LinePattern::Dashed;
308 style.animation = AnimationStyle::Animated;
309 } else {
310 style.line_pattern = LinePattern::Solid;
311 style.animation = AnimationStyle::Static;
312 }
313
314 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316 style.halo = HaloStyle::LightBlue;
317 } else {
318 style.halo = HaloStyle::None;
319 }
320
321 if edge_properties.contains(&HydroEdgeProp::Stream) {
323 style.arrowhead = ArrowheadStyle::TriangleFilled;
324 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326 style.arrowhead = ArrowheadStyle::TriangleFilled;
327 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329 style.arrowhead = ArrowheadStyle::TriangleFilled;
330 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332 style.arrowhead = ArrowheadStyle::CircleFilled;
333 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335 style.arrowhead = ArrowheadStyle::DiamondOpen;
336 style.color = "#6b7280"; }
338
339 if edge_properties.contains(&HydroEdgeProp::Keyed) {
341 style.line_style = LineStyle::HashMarks; } else {
343 style.line_style = LineStyle::Single;
344 }
345
346 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348 style.waviness = WavinessStyle::Wavy;
349 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350 style.waviness = WavinessStyle::None;
351 }
352
353 style
354}
355
356pub fn extract_edge_properties_from_collection_kind(
360 collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362 use crate::compile::ir::CollectionKind;
363
364 let mut properties = HashSet::new();
365
366 match collection_kind {
367 CollectionKind::Stream { bound, order, .. } => {
368 properties.insert(HydroEdgeProp::Stream);
369 add_bound_property(&mut properties, bound);
370 add_order_property(&mut properties, order);
371 }
372 CollectionKind::KeyedStream {
373 bound, value_order, ..
374 } => {
375 properties.insert(HydroEdgeProp::KeyedStream);
376 properties.insert(HydroEdgeProp::Keyed);
377 add_bound_property(&mut properties, bound);
378 add_order_property(&mut properties, value_order);
379 }
380 CollectionKind::Singleton { bound, .. } => {
381 properties.insert(HydroEdgeProp::Singleton);
382 add_singleton_bound_property(&mut properties, bound);
383 properties.insert(HydroEdgeProp::TotalOrder);
385 }
386 CollectionKind::Optional { bound, .. } => {
387 properties.insert(HydroEdgeProp::Optional);
388 add_bound_property(&mut properties, bound);
389 properties.insert(HydroEdgeProp::TotalOrder);
391 }
392 CollectionKind::KeyedSingleton { bound, .. } => {
393 properties.insert(HydroEdgeProp::Singleton);
394 properties.insert(HydroEdgeProp::Keyed);
395 add_keyed_singleton_bound_property(&mut properties, bound);
397 properties.insert(HydroEdgeProp::TotalOrder);
398 }
399 }
400
401 properties
402}
403
404fn add_bound_property(
406 properties: &mut HashSet<HydroEdgeProp>,
407 bound: &crate::compile::ir::BoundKind,
408) {
409 use crate::compile::ir::BoundKind;
410
411 match bound {
412 BoundKind::Bounded => {
413 properties.insert(HydroEdgeProp::Bounded);
414 }
415 BoundKind::Unbounded => {
416 properties.insert(HydroEdgeProp::Unbounded);
417 }
418 }
419}
420
421fn add_singleton_bound_property(
423 properties: &mut HashSet<HydroEdgeProp>,
424 bound: &crate::compile::ir::SingletonBoundKind,
425) {
426 use crate::compile::ir::SingletonBoundKind;
427
428 match bound {
429 SingletonBoundKind::Bounded => {
430 properties.insert(HydroEdgeProp::Bounded);
431 }
432 SingletonBoundKind::Monotonic | SingletonBoundKind::Unbounded => {
433 properties.insert(HydroEdgeProp::Unbounded);
434 }
435 }
436}
437
438fn add_keyed_singleton_bound_property(
440 properties: &mut HashSet<HydroEdgeProp>,
441 bound: &crate::compile::ir::KeyedSingletonBoundKind,
442) {
443 use crate::compile::ir::KeyedSingletonBoundKind;
444
445 match bound {
446 KeyedSingletonBoundKind::Bounded => {
447 properties.insert(HydroEdgeProp::Bounded);
448 }
449 KeyedSingletonBoundKind::BoundedValue
450 | KeyedSingletonBoundKind::MonotonicValue
451 | KeyedSingletonBoundKind::Unbounded => {
452 properties.insert(HydroEdgeProp::Unbounded);
453 }
454 }
455}
456
457fn add_order_property(
459 properties: &mut HashSet<HydroEdgeProp>,
460 order: &crate::compile::ir::StreamOrder,
461) {
462 use crate::compile::ir::StreamOrder;
463
464 match order {
465 StreamOrder::TotalOrder => {
466 properties.insert(HydroEdgeProp::TotalOrder);
467 }
468 StreamOrder::NoOrder => {
469 properties.insert(HydroEdgeProp::NoOrder);
470 }
471 }
472}
473
474pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
477 src_location.root() != dst_location.root()
479}
480
481pub fn add_network_edge_tag(
483 properties: &mut HashSet<HydroEdgeProp>,
484 src_location: &LocationId,
485 dst_location: &LocationId,
486) {
487 if is_network_edge(src_location, dst_location) {
488 properties.insert(HydroEdgeProp::Network);
489 }
490}
491
492#[derive(Debug, Clone, Copy)]
494pub struct HydroWriteConfig<'a> {
495 pub show_metadata: bool,
496 pub show_location_groups: bool,
497 pub use_short_labels: bool,
498 pub location_names: &'a SecondaryMap<LocationKey, String>,
499}
500
501impl Default for HydroWriteConfig<'_> {
502 fn default() -> Self {
503 static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
504 Self {
505 show_metadata: false,
506 show_location_groups: true,
507 use_short_labels: true, location_names: EMPTY.get_or_init(SecondaryMap::new),
509 }
510 }
511}
512
513#[derive(Clone)]
515pub struct HydroGraphNode {
516 pub label: NodeLabel,
517 pub node_type: HydroNodeType,
518 pub location_key: Option<LocationKey>,
519 pub backtrace: Option<Backtrace>,
520}
521
522slotmap::new_key_type! {
523 pub struct VizNodeKey;
527}
528
529impl Display for VizNodeKey {
530 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531 write!(f, "viz{:?}", self.data()) }
533}
534
535impl std::str::FromStr for VizNodeKey {
538 type Err = Option<ParseIntError>;
539
540 fn from_str(s: &str) -> Result<Self, Self::Err> {
541 let nvn = s.strip_prefix("viz").ok_or(None)?;
542 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
543 let idx: u64 = idx.parse()?;
544 let ver: u64 = ver.parse()?;
545 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
546 }
547}
548
549impl VizNodeKey {
550 #[cfg(test)]
552 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000001)); #[cfg(test)]
556 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000002)); }
558
559#[derive(Debug, Clone)]
561pub struct HydroGraphEdge {
562 pub src: VizNodeKey,
563 pub dst: VizNodeKey,
564 pub edge_properties: HashSet<HydroEdgeProp>,
565 pub label: Option<String>,
566}
567
568#[derive(Default)]
570pub struct HydroGraphStructure {
571 pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
572 pub edges: Vec<HydroGraphEdge>,
573 pub locations: SecondaryMap<LocationKey, LocationType>,
574}
575
576impl HydroGraphStructure {
577 pub fn new() -> Self {
578 Self::default()
579 }
580
581 pub fn add_node(
582 &mut self,
583 label: NodeLabel,
584 node_type: HydroNodeType,
585 location_key: Option<LocationKey>,
586 ) -> VizNodeKey {
587 self.add_node_with_backtrace(label, node_type, location_key, None)
588 }
589
590 pub fn add_node_with_backtrace(
591 &mut self,
592 label: NodeLabel,
593 node_type: HydroNodeType,
594 location_key: Option<LocationKey>,
595 backtrace: Option<Backtrace>,
596 ) -> VizNodeKey {
597 self.nodes.insert(HydroGraphNode {
598 label,
599 node_type,
600 location_key,
601 backtrace,
602 })
603 }
604
605 pub fn add_node_with_metadata(
607 &mut self,
608 label: NodeLabel,
609 node_type: HydroNodeType,
610 metadata: &HydroIrMetadata,
611 ) -> VizNodeKey {
612 let location_key = Some(setup_location(self, metadata));
613 let backtrace = Some(metadata.op.backtrace.clone());
614 self.add_node_with_backtrace(label, node_type, location_key, backtrace)
615 }
616
617 pub fn add_edge(
618 &mut self,
619 src: VizNodeKey,
620 dst: VizNodeKey,
621 edge_properties: HashSet<HydroEdgeProp>,
622 label: Option<String>,
623 ) {
624 self.edges.push(HydroGraphEdge {
625 src,
626 dst,
627 edge_properties,
628 label,
629 });
630 }
631
632 pub fn add_edge_single(
634 &mut self,
635 src: VizNodeKey,
636 dst: VizNodeKey,
637 edge_type: HydroEdgeProp,
638 label: Option<String>,
639 ) {
640 let mut properties = HashSet::new();
641 properties.insert(edge_type);
642 self.edges.push(HydroGraphEdge {
643 src,
644 dst,
645 edge_properties: properties,
646 label,
647 });
648 }
649
650 pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
651 self.locations.insert(location_key, location_type);
652 }
653}
654
655pub fn extract_op_name(full_label: String) -> String {
657 full_label
658 .split('(')
659 .next()
660 .unwrap_or("unknown")
661 .to_lowercase()
662}
663
664pub fn extract_short_label(full_label: &str) -> String {
666 if let Some(op_name) = full_label.split('(').next() {
668 let base_name = op_name.to_lowercase();
669 match base_name.as_str() {
670 "source" => {
672 if full_label.contains("Iter") {
673 "source_iter".to_owned()
674 } else if full_label.contains("Stream") {
675 "source_stream".to_owned()
676 } else if full_label.contains("ExternalNetwork") {
677 "external_network".to_owned()
678 } else if full_label.contains("Spin") {
679 "spin".to_owned()
680 } else {
681 "source".to_owned()
682 }
683 }
684 "network" => {
685 if full_label.contains("deser") {
686 "network(recv)".to_owned()
687 } else if full_label.contains("ser") {
688 "network(send)".to_owned()
689 } else {
690 "network".to_owned()
691 }
692 }
693 _ => base_name,
695 }
696 } else {
697 if full_label.len() > 20 {
699 format!("{}...", &full_label[..17])
700 } else {
701 full_label.to_owned()
702 }
703 }
704}
705
706fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
708 let root = metadata.location_id.root();
709 let location_key = root.key();
710 let location_type = root.location_type().unwrap();
711 structure.add_location(location_key, location_type);
712 location_key
713}
714
715fn add_edge_with_metadata(
718 structure: &mut HydroGraphStructure,
719 src_id: VizNodeKey,
720 dst_id: VizNodeKey,
721 src_metadata: Option<&HydroIrMetadata>,
722 dst_metadata: Option<&HydroIrMetadata>,
723 label: Option<String>,
724) {
725 let mut properties = HashSet::new();
726
727 if let Some(metadata) = src_metadata {
729 properties.extend(extract_edge_properties_from_collection_kind(
730 &metadata.collection_kind,
731 ));
732 }
733
734 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
736 add_network_edge_tag(
737 &mut properties,
738 &src_meta.location_id,
739 &dst_meta.location_id,
740 );
741 }
742
743 if properties.is_empty() {
745 properties.insert(HydroEdgeProp::Stream);
746 }
747
748 structure.add_edge(src_id, dst_id, properties, label);
749}
750
751fn write_graph_structure<W>(
753 structure: &HydroGraphStructure,
754 graph_write: W,
755 config: HydroWriteConfig<'_>,
756) -> Result<(), W::Err>
757where
758 W: HydroGraphWrite,
759{
760 let mut graph_write = graph_write;
761 graph_write.write_prologue()?;
763
764 for (node_id, node) in structure.nodes.iter() {
766 let location_type = node
767 .location_key
768 .and_then(|loc_key| structure.locations.get(loc_key))
769 .copied();
770
771 graph_write.write_node_definition(
772 node_id,
773 &node.label,
774 node.node_type,
775 node.location_key,
776 location_type,
777 node.backtrace.as_ref(),
778 )?;
779 }
780
781 if config.show_location_groups {
783 let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
784 for (node_id, node) in structure.nodes.iter() {
785 if let Some(location_key) = node.location_key {
786 nodes_by_location
787 .entry(location_key)
788 .expect("location was removed")
789 .or_default()
790 .push(node_id);
791 }
792 }
793
794 for (location_key, node_ids) in nodes_by_location.iter() {
795 if let Some(&location_type) = structure.locations.get(location_key) {
796 graph_write.write_location_start(location_key, location_type)?;
797 for &node_id in node_ids.iter() {
798 graph_write.write_node(node_id)?;
799 }
800 graph_write.write_location_end()?;
801 }
802 }
803 }
804
805 for edge in structure.edges.iter() {
807 graph_write.write_edge(
808 edge.src,
809 edge.dst,
810 &edge.edge_properties,
811 edge.label.as_deref(),
812 )?;
813 }
814
815 graph_write.write_epilogue()?;
816 Ok(())
817}
818
819impl HydroRoot {
820 pub fn build_graph_structure(
822 &self,
823 structure: &mut HydroGraphStructure,
824 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
825 config: HydroWriteConfig<'_>,
826 ) -> VizNodeKey {
827 fn build_sink_node(
829 structure: &mut HydroGraphStructure,
830 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
831 config: HydroWriteConfig<'_>,
832 input: &HydroNode,
833 sink_metadata: Option<&HydroIrMetadata>,
834 label: NodeLabel,
835 ) -> VizNodeKey {
836 let input_id = input.build_graph_structure(structure, seen_tees, config);
837
838 let effective_metadata = if let Some(meta) = sink_metadata {
840 Some(meta)
841 } else {
842 match input {
843 HydroNode::Placeholder => None,
844 _ => Some(input.metadata()),
846 }
847 };
848
849 let location_key = effective_metadata.map(|m| setup_location(structure, m));
850 let sink_id = structure.add_node_with_backtrace(
851 label,
852 HydroNodeType::Sink,
853 location_key,
854 effective_metadata.map(|m| m.op.backtrace.clone()),
855 );
856
857 let input_metadata = input.metadata();
859 add_edge_with_metadata(
860 structure,
861 input_id,
862 sink_id,
863 Some(input_metadata),
864 sink_metadata,
865 None,
866 );
867
868 sink_id
869 }
870
871 match self {
872 HydroRoot::ForEach { f, input, .. } => build_sink_node(
874 structure,
875 seen_tees,
876 config,
877 input,
878 None,
879 NodeLabel::with_exprs("for_each".to_owned(), vec![f.expr.clone()]),
880 ),
881
882 HydroRoot::SendExternal {
883 to_external_key,
884 to_port_id,
885 input,
886 ..
887 } => build_sink_node(
888 structure,
889 seen_tees,
890 config,
891 input,
892 None,
893 NodeLabel::with_exprs(
894 format!("send_external({}:{})", to_external_key, to_port_id),
895 vec![],
896 ),
897 ),
898
899 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
900 structure,
901 seen_tees,
902 config,
903 input,
904 None,
905 NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
906 ),
907
908 HydroRoot::CycleSink {
909 cycle_id, input, ..
910 } => build_sink_node(
911 structure,
912 seen_tees,
913 config,
914 input,
915 None,
916 NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
917 ),
918
919 HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
920 structure,
921 seen_tees,
922 config,
923 input,
924 None,
925 NodeLabel::static_label(format!("embedded_output({})", ident)),
926 ),
927
928 HydroRoot::Null { input, .. } => build_sink_node(
929 structure,
930 seen_tees,
931 config,
932 input,
933 None,
934 NodeLabel::static_label("null".to_owned()),
935 ),
936 }
937 }
938}
939
940impl HydroNode {
941 pub fn build_graph_structure(
943 &self,
944 structure: &mut HydroGraphStructure,
945 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
946 config: HydroWriteConfig<'_>,
947 ) -> VizNodeKey {
948 struct TransformParams<'a> {
952 structure: &'a mut HydroGraphStructure,
953 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
954 config: HydroWriteConfig<'a>,
955 input: &'a HydroNode,
956 metadata: &'a HydroIrMetadata,
957 op_name: String,
958 node_type: HydroNodeType,
959 }
960
961 fn build_simple_transform(params: TransformParams) -> VizNodeKey {
963 let input_id = params.input.build_graph_structure(
964 params.structure,
965 params.seen_tees,
966 params.config,
967 );
968 let node_id = params.structure.add_node_with_metadata(
969 NodeLabel::Static(params.op_name.to_string()),
970 params.node_type,
971 params.metadata,
972 );
973
974 let input_metadata = params.input.metadata();
976 add_edge_with_metadata(
977 params.structure,
978 input_id,
979 node_id,
980 Some(input_metadata),
981 Some(params.metadata),
982 None,
983 );
984
985 node_id
986 }
987
988 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
990 let input_id = params.input.build_graph_structure(
991 params.structure,
992 params.seen_tees,
993 params.config,
994 );
995 let node_id = params.structure.add_node_with_metadata(
996 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
997 params.node_type,
998 params.metadata,
999 );
1000
1001 let input_metadata = params.input.metadata();
1003 add_edge_with_metadata(
1004 params.structure,
1005 input_id,
1006 node_id,
1007 Some(input_metadata),
1008 Some(params.metadata),
1009 None,
1010 );
1011
1012 node_id
1013 }
1014
1015 fn build_dual_expr_transform(
1017 params: TransformParams,
1018 expr1: &DebugExpr,
1019 expr2: &DebugExpr,
1020 ) -> VizNodeKey {
1021 let input_id = params.input.build_graph_structure(
1022 params.structure,
1023 params.seen_tees,
1024 params.config,
1025 );
1026 let node_id = params.structure.add_node_with_metadata(
1027 NodeLabel::with_exprs(
1028 params.op_name.to_string(),
1029 vec![expr1.clone(), expr2.clone()],
1030 ),
1031 params.node_type,
1032 params.metadata,
1033 );
1034
1035 let input_metadata = params.input.metadata();
1037 add_edge_with_metadata(
1038 params.structure,
1039 input_id,
1040 node_id,
1041 Some(input_metadata),
1042 Some(params.metadata),
1043 None,
1044 );
1045
1046 node_id
1047 }
1048
1049 fn build_source_node(
1051 structure: &mut HydroGraphStructure,
1052 metadata: &HydroIrMetadata,
1053 label: String,
1054 ) -> VizNodeKey {
1055 structure.add_node_with_metadata(
1056 NodeLabel::Static(label),
1057 HydroNodeType::Source,
1058 metadata,
1059 )
1060 }
1061
1062 match self {
1063 HydroNode::Placeholder => structure.add_node(
1064 NodeLabel::Static("PLACEHOLDER".to_owned()),
1065 HydroNodeType::Transform,
1066 None,
1067 ),
1068
1069 HydroNode::Source {
1070 source, metadata, ..
1071 } => {
1072 let label = match source {
1073 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1074 HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1075 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1076 HydroSource::Spin() => "spin()".to_owned(),
1077 HydroSource::ClusterMembers(location_id, _) => {
1078 format!(
1079 "source_stream(cluster_membership_stream({:?}))",
1080 location_id
1081 )
1082 }
1083 HydroSource::Embedded(ident) => {
1084 format!("embedded_input({})", ident)
1085 }
1086 HydroSource::EmbeddedSingleton(ident) => {
1087 format!("embedded_singleton_input({})", ident)
1088 }
1089 };
1090 build_source_node(structure, metadata, label)
1091 }
1092
1093 HydroNode::SingletonSource {
1094 value,
1095 first_tick_only,
1096 metadata,
1097 } => {
1098 let label = if *first_tick_only {
1099 format!("singleton_first_tick({})", value)
1100 } else {
1101 format!("singleton({})", value)
1102 };
1103 build_source_node(structure, metadata, label)
1104 }
1105
1106 HydroNode::ExternalInput {
1107 from_external_key,
1108 from_port_id,
1109 metadata,
1110 ..
1111 } => build_source_node(
1112 structure,
1113 metadata,
1114 format!("external_input({}:{})", from_external_key, from_port_id),
1115 ),
1116
1117 HydroNode::CycleSource {
1118 cycle_id, metadata, ..
1119 } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
1120
1121 HydroNode::Tee { inner, metadata }
1122 | HydroNode::Reference {
1123 inner, metadata, ..
1124 } => {
1125 let ptr = inner.as_ptr();
1126 if let Some(&existing_id) = seen_tees.get(&ptr) {
1127 return existing_id;
1128 }
1129
1130 let input_id = inner
1131 .0
1132 .borrow()
1133 .build_graph_structure(structure, seen_tees, config);
1134 let node_type = if matches!(self, HydroNode::Reference { .. }) {
1135 HydroNodeType::Aggregation
1136 } else {
1137 HydroNodeType::Tee
1138 };
1139 let tee_id = structure.add_node_with_metadata(
1140 NodeLabel::Static(extract_op_name(self.print_root())),
1141 node_type,
1142 metadata,
1143 );
1144
1145 seen_tees.insert(ptr, tee_id);
1146
1147 let inner_borrow = inner.0.borrow();
1149 let input_metadata = inner_borrow.metadata();
1150 add_edge_with_metadata(
1151 structure,
1152 input_id,
1153 tee_id,
1154 Some(input_metadata),
1155 Some(metadata),
1156 None,
1157 );
1158 drop(inner_borrow);
1159
1160 tee_id
1161 }
1162
1163 HydroNode::Partition {
1164 inner, metadata, ..
1165 } => {
1166 let ptr = inner.as_ptr();
1167 if let Some(&existing_id) = seen_tees.get(&ptr) {
1168 return existing_id;
1169 }
1170
1171 let input_id = inner
1172 .0
1173 .borrow()
1174 .build_graph_structure(structure, seen_tees, config);
1175 let partition_id = structure.add_node_with_metadata(
1176 NodeLabel::Static(extract_op_name(self.print_root())),
1177 HydroNodeType::Tee,
1178 metadata,
1179 );
1180
1181 seen_tees.insert(ptr, partition_id);
1182
1183 let inner_borrow = inner.0.borrow();
1185 let input_metadata = inner_borrow.metadata();
1186 add_edge_with_metadata(
1187 structure,
1188 input_id,
1189 partition_id,
1190 Some(input_metadata),
1191 Some(metadata),
1192 None,
1193 );
1194 drop(inner_borrow);
1195
1196 partition_id
1197 }
1198
1199 HydroNode::ObserveNonDet {
1201 inner, metadata, ..
1202 } => build_simple_transform(TransformParams {
1203 structure,
1204 seen_tees,
1205 config,
1206 input: inner,
1207 metadata,
1208 op_name: extract_op_name(self.print_root()),
1209 node_type: HydroNodeType::NonDeterministic,
1210 }),
1211
1212 HydroNode::Cast { inner, metadata }
1214 | HydroNode::AssertIsConsistent {
1215 inner, metadata, ..
1216 }
1217 | HydroNode::DeferTick {
1218 input: inner,
1219 metadata,
1220 }
1221 | HydroNode::Enumerate {
1222 input: inner,
1223 metadata,
1224 ..
1225 }
1226 | HydroNode::Unique {
1227 input: inner,
1228 metadata,
1229 }
1230 | HydroNode::ResolveFutures {
1231 input: inner,
1232 metadata,
1233 }
1234 | HydroNode::ResolveFuturesBlocking {
1235 input: inner,
1236 metadata,
1237 }
1238 | HydroNode::ResolveFuturesOrdered {
1239 input: inner,
1240 metadata,
1241 } => build_simple_transform(TransformParams {
1242 structure,
1243 seen_tees,
1244 config,
1245 input: inner,
1246 metadata,
1247 op_name: extract_op_name(self.print_root()),
1248 node_type: HydroNodeType::Transform,
1249 }),
1250
1251 HydroNode::Sort {
1253 input: inner,
1254 metadata,
1255 } => build_simple_transform(TransformParams {
1256 structure,
1257 seen_tees,
1258 config,
1259 input: inner,
1260 metadata,
1261 op_name: extract_op_name(self.print_root()),
1262 node_type: HydroNodeType::Aggregation,
1263 }),
1264
1265 HydroNode::Map {
1267 f, input, metadata, ..
1268 }
1269 | HydroNode::Filter { f, input, metadata }
1270 | HydroNode::FlatMap { f, input, metadata }
1271 | HydroNode::FlatMapStreamBlocking { f, input, metadata }
1272 | HydroNode::FilterMap { f, input, metadata }
1273 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1274 TransformParams {
1275 structure,
1276 seen_tees,
1277 config,
1278 input,
1279 metadata,
1280 op_name: extract_op_name(self.print_root()),
1281 node_type: HydroNodeType::Transform,
1282 },
1283 &f.expr,
1284 ),
1285
1286 HydroNode::Reduce { f, input, metadata }
1288 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1289 TransformParams {
1290 structure,
1291 seen_tees,
1292 config,
1293 input,
1294 metadata,
1295 op_name: extract_op_name(self.print_root()),
1296 node_type: HydroNodeType::Aggregation,
1297 },
1298 &f.expr,
1299 ),
1300
1301 HydroNode::Join {
1303 left,
1304 right,
1305 metadata,
1306 }
1307 | HydroNode::JoinHalf {
1308 left,
1309 right,
1310 metadata,
1311 }
1312 | HydroNode::CrossProduct {
1313 left,
1314 right,
1315 metadata,
1316 }
1317 | HydroNode::CrossSingleton {
1318 left,
1319 right,
1320 metadata,
1321 } => {
1322 let left_id = left.build_graph_structure(structure, seen_tees, config);
1323 let right_id = right.build_graph_structure(structure, seen_tees, config);
1324 let node_id = structure.add_node_with_metadata(
1325 NodeLabel::Static(extract_op_name(self.print_root())),
1326 HydroNodeType::Join,
1327 metadata,
1328 );
1329
1330 let left_metadata = left.metadata();
1332 add_edge_with_metadata(
1333 structure,
1334 left_id,
1335 node_id,
1336 Some(left_metadata),
1337 Some(metadata),
1338 Some("left".to_owned()),
1339 );
1340
1341 let right_metadata = right.metadata();
1343 add_edge_with_metadata(
1344 structure,
1345 right_id,
1346 node_id,
1347 Some(right_metadata),
1348 Some(metadata),
1349 Some("right".to_owned()),
1350 );
1351
1352 node_id
1353 }
1354
1355 HydroNode::Difference {
1357 pos: left,
1358 neg: right,
1359 metadata,
1360 }
1361 | HydroNode::AntiJoin {
1362 pos: left,
1363 neg: right,
1364 metadata,
1365 } => {
1366 let left_id = left.build_graph_structure(structure, seen_tees, config);
1367 let right_id = right.build_graph_structure(structure, seen_tees, config);
1368 let node_id = structure.add_node_with_metadata(
1369 NodeLabel::Static(extract_op_name(self.print_root())),
1370 HydroNodeType::Join,
1371 metadata,
1372 );
1373
1374 let left_metadata = left.metadata();
1376 add_edge_with_metadata(
1377 structure,
1378 left_id,
1379 node_id,
1380 Some(left_metadata),
1381 Some(metadata),
1382 Some("pos".to_owned()),
1383 );
1384
1385 let right_metadata = right.metadata();
1387 add_edge_with_metadata(
1388 structure,
1389 right_id,
1390 node_id,
1391 Some(right_metadata),
1392 Some(metadata),
1393 Some("neg".to_owned()),
1394 );
1395
1396 node_id
1397 }
1398
1399 HydroNode::Fold {
1401 init,
1402 acc,
1403 input,
1404 metadata,
1405 ..
1406 }
1407 | HydroNode::FoldKeyed {
1408 init,
1409 acc,
1410 input,
1411 metadata,
1412 ..
1413 }
1414 | HydroNode::Scan {
1415 init,
1416 acc,
1417 input,
1418 metadata,
1419 }
1420 | HydroNode::ScanAsyncBlocking {
1421 init,
1422 acc,
1423 input,
1424 metadata,
1425 } => {
1426 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1429 TransformParams {
1430 structure,
1431 seen_tees,
1432 config,
1433 input,
1434 metadata,
1435 op_name: extract_op_name(self.print_root()),
1436 node_type,
1437 },
1438 &init.expr,
1439 &acc.expr,
1440 )
1441 }
1442
1443 HydroNode::ReduceKeyedWatermark {
1445 f,
1446 input,
1447 watermark,
1448 metadata,
1449 } => {
1450 let input_id = input.build_graph_structure(structure, seen_tees, config);
1451 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1452 let location_key = Some(setup_location(structure, metadata));
1453 let join_node_id = structure.add_node_with_backtrace(
1454 NodeLabel::Static(extract_op_name(self.print_root())),
1455 HydroNodeType::Join,
1456 location_key,
1457 Some(metadata.op.backtrace.clone()),
1458 );
1459
1460 let input_metadata = input.metadata();
1462 add_edge_with_metadata(
1463 structure,
1464 input_id,
1465 join_node_id,
1466 Some(input_metadata),
1467 Some(metadata),
1468 Some("input".to_owned()),
1469 );
1470
1471 let watermark_metadata = watermark.metadata();
1473 add_edge_with_metadata(
1474 structure,
1475 watermark_id,
1476 join_node_id,
1477 Some(watermark_metadata),
1478 Some(metadata),
1479 Some("watermark".to_owned()),
1480 );
1481
1482 let node_id = structure.add_node_with_backtrace(
1483 NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.expr.clone()]),
1484 HydroNodeType::Aggregation,
1485 location_key,
1486 Some(metadata.op.backtrace.clone()),
1487 );
1488
1489 let join_metadata = metadata; add_edge_with_metadata(
1492 structure,
1493 join_node_id,
1494 node_id,
1495 Some(join_metadata),
1496 Some(metadata),
1497 None,
1498 );
1499
1500 node_id
1501 }
1502
1503 HydroNode::Network {
1504 serialize_fn,
1505 deserialize_fn,
1506 input,
1507 metadata,
1508 ..
1509 } => {
1510 let input_id = input.build_graph_structure(structure, seen_tees, config);
1511 let _from_location_key = setup_location(structure, metadata);
1512
1513 let root = metadata.location_id.root();
1514 let to_location_key = root.key();
1515 let to_location_type = root.location_type().unwrap();
1516 structure.add_location(to_location_key, to_location_type);
1517
1518 let mut label = "network(".to_owned();
1519 if serialize_fn.is_some() {
1520 label.push_str("send");
1521 }
1522 if deserialize_fn.is_some() {
1523 if serialize_fn.is_some() {
1524 label.push_str(" + ");
1525 }
1526 label.push_str("recv");
1527 }
1528 label.push(')');
1529
1530 let network_id = structure.add_node_with_backtrace(
1531 NodeLabel::Static(label),
1532 HydroNodeType::Network,
1533 Some(to_location_key),
1534 Some(metadata.op.backtrace.clone()),
1535 );
1536
1537 let input_metadata = input.metadata();
1539 add_edge_with_metadata(
1540 structure,
1541 input_id,
1542 network_id,
1543 Some(input_metadata),
1544 Some(metadata),
1545 Some(format!("to {:?}({})", to_location_type, to_location_key)),
1546 );
1547
1548 network_id
1549 }
1550
1551 HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1553 structure,
1554 seen_tees,
1555 config,
1556 input: inner,
1557 metadata,
1558 op_name: extract_op_name(self.print_root()),
1559 node_type: HydroNodeType::NonDeterministic,
1560 }),
1561
1562 HydroNode::YieldConcat { inner, .. } => {
1563 inner.build_graph_structure(structure, seen_tees, config)
1565 }
1566
1567 HydroNode::UnboundSingleton { inner, .. } => {
1568 inner.build_graph_structure(structure, seen_tees, config)
1569 }
1570
1571 HydroNode::BeginAtomic { inner, .. } => {
1572 inner.build_graph_structure(structure, seen_tees, config)
1573 }
1574
1575 HydroNode::EndAtomic { inner, .. } => {
1576 inner.build_graph_structure(structure, seen_tees, config)
1577 }
1578
1579 HydroNode::Chain {
1580 first,
1581 second,
1582 metadata,
1583 }
1584 | HydroNode::MergeOrdered {
1585 first,
1586 second,
1587 metadata,
1588 } => {
1589 let first_id = first.build_graph_structure(structure, seen_tees, config);
1590 let second_id = second.build_graph_structure(structure, seen_tees, config);
1591 let location_key = Some(setup_location(structure, metadata));
1592 let chain_id = structure.add_node_with_backtrace(
1593 NodeLabel::Static(extract_op_name(self.print_root())),
1594 HydroNodeType::Transform,
1595 location_key,
1596 Some(metadata.op.backtrace.clone()),
1597 );
1598
1599 let first_metadata = first.metadata();
1601 add_edge_with_metadata(
1602 structure,
1603 first_id,
1604 chain_id,
1605 Some(first_metadata),
1606 Some(metadata),
1607 Some("first".to_owned()),
1608 );
1609
1610 let second_metadata = second.metadata();
1612 add_edge_with_metadata(
1613 structure,
1614 second_id,
1615 chain_id,
1616 Some(second_metadata),
1617 Some(metadata),
1618 Some("second".to_owned()),
1619 );
1620
1621 chain_id
1622 }
1623
1624 HydroNode::ChainFirst {
1625 first,
1626 second,
1627 metadata,
1628 } => {
1629 let first_id = first.build_graph_structure(structure, seen_tees, config);
1630 let second_id = second.build_graph_structure(structure, seen_tees, config);
1631 let location_key = Some(setup_location(structure, metadata));
1632 let chain_id = structure.add_node_with_backtrace(
1633 NodeLabel::Static(extract_op_name(self.print_root())),
1634 HydroNodeType::Transform,
1635 location_key,
1636 Some(metadata.op.backtrace.clone()),
1637 );
1638
1639 let first_metadata = first.metadata();
1641 add_edge_with_metadata(
1642 structure,
1643 first_id,
1644 chain_id,
1645 Some(first_metadata),
1646 Some(metadata),
1647 Some("first".to_owned()),
1648 );
1649
1650 let second_metadata = second.metadata();
1652 add_edge_with_metadata(
1653 structure,
1654 second_id,
1655 chain_id,
1656 Some(second_metadata),
1657 Some(metadata),
1658 Some("second".to_owned()),
1659 );
1660
1661 chain_id
1662 }
1663
1664 HydroNode::Counter {
1665 tag: _,
1666 prefix: _,
1667 duration,
1668 input,
1669 metadata,
1670 } => build_single_expr_transform(
1671 TransformParams {
1672 structure,
1673 seen_tees,
1674 config,
1675 input,
1676 metadata,
1677 op_name: extract_op_name(self.print_root()),
1678 node_type: HydroNodeType::Transform,
1679 },
1680 duration,
1681 ),
1682 }
1683 }
1684}
1685
1686macro_rules! render_hydro_ir {
1689 ($name:ident, $write_fn:ident) => {
1690 pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1691 let mut output = String::new();
1692 $write_fn(&mut output, roots, config).unwrap();
1693 output
1694 }
1695 };
1696}
1697
1698macro_rules! write_hydro_ir {
1700 ($name:ident, $writer_type:ty, $constructor:expr) => {
1701 pub fn $name(
1702 output: impl std::fmt::Write,
1703 roots: &[HydroRoot],
1704 config: HydroWriteConfig<'_>,
1705 ) -> std::fmt::Result {
1706 let mut graph_write: $writer_type = $constructor(output, config);
1707 write_hydro_ir_graph(&mut graph_write, roots, config)
1708 }
1709 };
1710}
1711
1712render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1713write_hydro_ir!(
1714 write_hydro_ir_mermaid,
1715 HydroMermaid<_>,
1716 HydroMermaid::new_with_config
1717);
1718
1719render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1720write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1721
1722render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1724
1725render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1727write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1728
1729fn write_hydro_ir_graph<W>(
1730 graph_write: W,
1731 roots: &[HydroRoot],
1732 config: HydroWriteConfig<'_>,
1733) -> Result<(), W::Err>
1734where
1735 W: HydroGraphWrite,
1736{
1737 let mut structure = HydroGraphStructure::new();
1738 let mut seen_tees = HashMap::new();
1739
1740 for leaf in roots {
1742 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1743 }
1744
1745 write_graph_structure(&structure, graph_write, config)
1746}