1use std::collections::HashMap;
2use std::fmt::Write;
3
4use serde_json;
5
6use super::render::{HydroEdgeType, HydroGraphWrite, HydroNodeType};
7
8pub struct HydroReactFlow<W> {
11 write: W,
12 nodes: Vec<serde_json::Value>,
13 edges: Vec<serde_json::Value>,
14 locations: HashMap<usize, (String, Vec<usize>)>, edge_count: usize,
16 config: super::render::HydroWriteConfig,
17 process_names: HashMap<usize, String>,
19 cluster_names: HashMap<usize, String>,
20 external_names: HashMap<usize, String>,
21}
22
23impl<W> HydroReactFlow<W> {
24 pub fn new(write: W, config: &super::render::HydroWriteConfig) -> Self {
25 let process_names: HashMap<usize, String> =
26 config.process_id_name.iter().cloned().collect();
27 let cluster_names: HashMap<usize, String> =
28 config.cluster_id_name.iter().cloned().collect();
29 let external_names: HashMap<usize, String> =
30 config.external_id_name.iter().cloned().collect();
31
32 Self {
33 write,
34 nodes: Vec::new(),
35 edges: Vec::new(),
36 locations: HashMap::new(),
37 edge_count: 0,
38 config: config.clone(),
39 process_names,
40 cluster_names,
41 external_names,
42 }
43 }
44
45 fn node_type_to_style(&self, _node_type: HydroNodeType) -> serde_json::Value {
46 let base_style = serde_json::json!({
48 "color": "#2d3748",
49 "border": "1px solid rgba(0, 0, 0, 0.1)",
50 "borderRadius": "12px",
51 "padding": "12px 16px",
52 "fontSize": "13px",
53 "fontFamily": "-apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif",
54 "fontWeight": "500",
55 "boxShadow": "0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06)",
56 "transition": "all 0.2s ease-in-out"
57 });
58
59 let mut style = base_style;
62
63 style["&:hover"] = serde_json::json!({
65 "transform": "translateY(-2px)",
66 "boxShadow": "0 8px 25px -5px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05)"
67 });
68
69 style
70 }
71 fn edge_type_to_style(&self, edge_type: HydroEdgeType) -> serde_json::Value {
72 let mut style = serde_json::json!({
74 "strokeWidth": 2,
75 "animated": false
76 });
77
78 match edge_type {
80 HydroEdgeType::Stream => {
81 style["stroke"] = serde_json::Value::String("#666666".to_string());
82 }
83 HydroEdgeType::Persistent => {
84 style["stroke"] = serde_json::Value::String("#008800".to_string());
85 style["strokeWidth"] = serde_json::Value::Number(serde_json::Number::from(3));
86 }
87 HydroEdgeType::Network => {
88 style["stroke"] = serde_json::Value::String("#880088".to_string());
89 style["strokeDasharray"] = serde_json::Value::String("5,5".to_string());
90 style["animated"] = serde_json::Value::Bool(true);
91 }
92 HydroEdgeType::Cycle => {
93 style["stroke"] = serde_json::Value::String("#ff0000".to_string());
94 style["animated"] = serde_json::Value::Bool(true);
95 }
96 }
97
98 style
99 }
100
101 fn apply_layout(&mut self) {
103 for node in &mut self.nodes {
105 node["position"]["x"] = serde_json::Value::Number(serde_json::Number::from(0));
106 node["position"]["y"] = serde_json::Value::Number(serde_json::Number::from(0));
107 }
108 }
109}
110
111impl<W> HydroGraphWrite for HydroReactFlow<W>
112where
113 W: Write,
114{
115 type Err = super::render::GraphWriteError;
116
117 fn write_prologue(&mut self) -> Result<(), Self::Err> {
118 self.nodes.clear();
120 self.edges.clear();
121 self.locations.clear();
122 self.edge_count = 0;
123 Ok(())
124 }
125
126 fn write_node_definition(
127 &mut self,
128 node_id: usize,
129 node_label: &super::render::NodeLabel,
130 node_type: HydroNodeType,
131 location_id: Option<usize>,
132 location_type: Option<&str>,
133 ) -> Result<(), Self::Err> {
134 let style = self.node_type_to_style(node_type);
135
136 let full_label = match node_label {
138 super::render::NodeLabel::Static(s) => s.clone(),
139 super::render::NodeLabel::WithExprs { op_name, exprs } => {
140 if exprs.is_empty() {
141 format!("{}()", op_name)
142 } else {
143 let expr_strs: Vec<String> = exprs.iter().map(|e| e.to_string()).collect();
145 format!("{}({})", op_name, expr_strs.join(", "))
146 }
147 }
148 };
149
150 let display_label = if self.config.use_short_labels {
152 super::render::extract_short_label(&full_label)
153 } else {
154 full_label.clone()
155 };
156
157 let short_label = super::render::extract_short_label(&full_label);
159
160 let enhanced_full_label = if short_label.len() >= full_label.len() - 2 {
162 match short_label.as_str() {
164 "inspect" => "inspect [debug output]".to_string(),
165 "persist" => "persist [state storage]".to_string(),
166 "tee" => "tee [branch dataflow]".to_string(),
167 "delta" => "delta [change detection]".to_string(),
168 "spin" => "spin [delay/buffer]".to_string(),
169 "send_bincode" => "send_bincode [send data to process/cluster]".to_string(),
170 "broadcast_bincode" => {
171 "broadcast_bincode [send data to all cluster members]".to_string()
172 }
173 "source_iter" => "source_iter [iterate over collection]".to_string(),
174 "source_stream" => "source_stream [receive external data stream]".to_string(),
175 "network(recv)" => "network(recv) [receive from network]".to_string(),
176 "network(send)" => "network(send) [send to network]".to_string(),
177 "dest_sink" => "dest_sink [output destination]".to_string(),
178 _ => {
179 if full_label.len() < 15 {
180 format!("{} [{}]", node_label, "hydro operator")
181 } else {
182 node_label.to_string()
183 }
184 }
185 }
186 } else {
187 node_label.to_string()
188 };
189
190 let node = serde_json::json!({
191 "id": node_id.to_string(),
192 "type": "default",
193 "data": {
194 "label": display_label,
195 "shortLabel": short_label,
196 "fullLabel": enhanced_full_label,
197 "expanded": false,
198 "locationId": location_id,
199 "locationType": location_type,
200 "nodeType": match node_type {
201 HydroNodeType::Source => "Source",
202 HydroNodeType::Transform => "Transform",
203 HydroNodeType::Join => "Join",
204 HydroNodeType::Aggregation => "Aggregation",
205 HydroNodeType::Network => "Network",
206 HydroNodeType::Sink => "Sink",
207 HydroNodeType::Tee => "Tee",
208 }
209 },
210 "position": {
211 "x": 0,
212 "y": 0
213 },
214 "style": style
215 });
216 self.nodes.push(node);
217 Ok(())
218 }
219
220 fn write_edge(
221 &mut self,
222 src_id: usize,
223 dst_id: usize,
224 edge_type: HydroEdgeType,
225 label: Option<&str>,
226 ) -> Result<(), Self::Err> {
227 let style = self.edge_type_to_style(edge_type);
228 let edge_id = format!("e{}", self.edge_count);
229 self.edge_count += 1;
230
231 let mut edge = serde_json::json!({
232 "id": edge_id,
233 "source": src_id.to_string(),
234 "target": dst_id.to_string(),
235 "style": style,
236 "type": "smoothstep",
238 "animated": false
241 });
242
243 if matches!(edge_type, HydroEdgeType::Network | HydroEdgeType::Cycle) {
245 edge["animated"] = serde_json::Value::Bool(true);
246 }
247
248 if let Some(label_text) = label {
249 edge["label"] = serde_json::Value::String(label_text.to_string());
250 edge["labelStyle"] = serde_json::json!({
251 "fontSize": "10px",
252 "fontFamily": "monospace",
253 "fill": "#333333",
254 "backgroundColor": "rgba(255, 255, 255, 0.8)",
255 "padding": "2px 4px",
256 "borderRadius": "3px"
257 });
258 edge["labelShowBg"] = serde_json::Value::Bool(true);
260 edge["labelBgStyle"] = serde_json::json!({
261 "fill": "rgba(255, 255, 255, 0.8)",
262 "fillOpacity": 0.8
263 });
264 }
265
266 self.edges.push(edge);
267 Ok(())
268 }
269
270 fn write_location_start(
271 &mut self,
272 location_id: usize,
273 location_type: &str,
274 ) -> Result<(), Self::Err> {
275 let location_label = match location_type {
276 "Process" => {
277 if let Some(name) = self.process_names.get(&location_id) {
278 name.clone()
279 } else {
280 format!("Process {}", location_id)
281 }
282 }
283 "Cluster" => {
284 if let Some(name) = self.cluster_names.get(&location_id) {
285 name.clone()
286 } else {
287 format!("Cluster {}", location_id)
288 }
289 }
290 "External" => {
291 if let Some(name) = self.external_names.get(&location_id) {
292 name.clone()
293 } else {
294 format!("External {}", location_id)
295 }
296 }
297 _ => location_type.to_string(),
298 };
299
300 self.locations
301 .insert(location_id, (location_label, Vec::new()));
302 Ok(())
303 }
304
305 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err> {
306 if let Some((_, node_ids)) = self.locations.values_mut().last() {
308 node_ids.push(node_id);
309 }
310 Ok(())
311 }
312
313 fn write_location_end(&mut self) -> Result<(), Self::Err> {
314 Ok(())
316 }
317
318 fn write_epilogue(&mut self) -> Result<(), Self::Err> {
319 self.apply_layout();
321
322 let output = serde_json::json!({
324 "nodes": self.nodes,
325 "edges": self.edges,
326 "locations": self.locations.iter().map(|(id, (label, nodes))| {
327 serde_json::json!({
328 "id": id.to_string(),
329 "label": label,
330 "nodes": nodes
331 })
332 }).collect::<Vec<_>>()
333 });
334
335 write!(
336 self.write,
337 "{}",
338 serde_json::to_string_pretty(&output).unwrap()
339 )
340 }
341}
342
343pub fn hydro_ir_to_reactflow(
345 ir: &[crate::ir::HydroRoot],
346 process_names: Vec<(usize, String)>,
347 cluster_names: Vec<(usize, String)>,
348 external_names: Vec<(usize, String)>,
349) -> Result<String, Box<dyn std::error::Error>> {
350 let mut output = String::new();
351
352 let config = super::render::HydroWriteConfig {
353 show_metadata: false,
354 show_location_groups: true,
355 use_short_labels: true, process_id_name: process_names,
357 cluster_id_name: cluster_names,
358 external_id_name: external_names,
359 };
360
361 super::render::write_hydro_ir_reactflow(&mut output, ir, &config)?;
362
363 Ok(output)
364}
365
366pub fn open_reactflow_browser(
368 ir: &[crate::ir::HydroRoot],
369 process_names: Vec<(usize, String)>,
370 cluster_names: Vec<(usize, String)>,
371 external_names: Vec<(usize, String)>,
372) -> Result<(), Box<dyn std::error::Error>> {
373 let config = super::render::HydroWriteConfig {
374 process_id_name: process_names,
375 cluster_id_name: cluster_names,
376 external_id_name: external_names,
377 ..Default::default()
378 };
379
380 super::debug::open_reactflow_browser(ir, Some("hydro_graph.html"), Some(config))
381 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
382}
383
384pub fn save_reactflow_json(
386 ir: &[crate::ir::HydroRoot],
387 process_names: Vec<(usize, String)>,
388 cluster_names: Vec<(usize, String)>,
389 external_names: Vec<(usize, String)>,
390 filename: &str,
391) -> Result<std::path::PathBuf, Box<dyn std::error::Error>> {
392 let config = super::render::HydroWriteConfig {
393 process_id_name: process_names,
394 cluster_id_name: cluster_names,
395 external_id_name: external_names,
396 ..Default::default()
397 };
398
399 super::debug::save_reactflow_json(ir, Some(filename), Some(config))
400 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
401}
402
403#[cfg(feature = "build")]
405pub fn open_browser(
406 built_flow: &crate::builder::built::BuiltFlow,
407) -> Result<(), Box<dyn std::error::Error>> {
408 open_reactflow_browser(
409 built_flow.ir(),
410 built_flow.process_id_name().clone(),
411 built_flow.cluster_id_name().clone(),
412 built_flow.external_id_name().clone(),
413 )
414}