1#![warn(missing_docs)]
2
3extern crate proc_macro;
4
5use std::collections::{BTreeMap, BTreeSet, VecDeque};
6use std::fmt::Debug;
7use std::iter::FusedIterator;
8
9use itertools::Itertools;
10use proc_macro2::{Ident, Literal, Span, TokenStream};
11use quote::{ToTokens, TokenStreamExt, format_ident, quote, quote_spanned};
12use serde::{Deserialize, Serialize};
13use slotmap::{Key, SecondaryMap, SlotMap, SparseSecondaryMap};
14use syn::spanned::Spanned;
15
16use super::graph_write::{Dot, GraphWrite, Mermaid};
17use super::ops::{
18 DelayType, OPERATORS, OperatorWriteOutput, WriteContextArgs, find_op_op_constraints,
19 null_write_iterator_fn,
20};
21use super::{
22 CONTEXT, Color, DiMulGraph, GRAPH, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId,
23 GraphSubgraphId, HANDOFF_NODE_STR, MODULE_BOUNDARY_NODE_STR, OperatorInstance, PortIndexValue,
24 Varname, change_spans, get_operator_generics,
25};
26use crate::diagnostic::{Diagnostic, Diagnostics, Level};
27use crate::pretty_span::{PrettyRowCol, PrettySpan};
28use crate::process_singletons;
29
30#[derive(Default, Debug, Serialize, Deserialize)]
40pub struct DfirGraph {
41 nodes: SlotMap<GraphNodeId, GraphNode>,
43
44 #[serde(skip)]
47 operator_instances: SecondaryMap<GraphNodeId, OperatorInstance>,
48 operator_tag: SecondaryMap<GraphNodeId, String>,
50 graph: DiMulGraph<GraphNodeId, GraphEdgeId>,
52 ports: SecondaryMap<GraphEdgeId, (PortIndexValue, PortIndexValue)>,
54
55 node_loops: SecondaryMap<GraphNodeId, GraphLoopId>,
57 loop_nodes: SlotMap<GraphLoopId, Vec<GraphNodeId>>,
59 loop_parent: SparseSecondaryMap<GraphLoopId, GraphLoopId>,
61 root_loops: Vec<GraphLoopId>,
63 loop_children: SecondaryMap<GraphLoopId, Vec<GraphLoopId>>,
65
66 node_subgraph: SecondaryMap<GraphNodeId, GraphSubgraphId>,
68
69 subgraph_nodes: SlotMap<GraphSubgraphId, Vec<GraphNodeId>>,
71 subgraph_stratum: SecondaryMap<GraphSubgraphId, usize>,
73
74 node_singleton_references: SparseSecondaryMap<GraphNodeId, Vec<Option<GraphNodeId>>>,
76 node_varnames: SparseSecondaryMap<GraphNodeId, Varname>,
78
79 subgraph_laziness: SecondaryMap<GraphSubgraphId, bool>,
83}
84
85impl DfirGraph {
87 pub fn new() -> Self {
89 Default::default()
90 }
91}
92
93impl DfirGraph {
95 pub fn node(&self, node_id: GraphNodeId) -> &GraphNode {
97 self.nodes.get(node_id).expect("Node not found.")
98 }
99
100 pub fn node_op_inst(&self, node_id: GraphNodeId) -> Option<&OperatorInstance> {
105 self.operator_instances.get(node_id)
106 }
107
108 pub fn node_varname(&self, node_id: GraphNodeId) -> Option<&Varname> {
110 self.node_varnames.get(node_id)
111 }
112
113 pub fn node_subgraph(&self, node_id: GraphNodeId) -> Option<GraphSubgraphId> {
115 self.node_subgraph.get(node_id).copied()
116 }
117
118 pub fn node_degree_in(&self, node_id: GraphNodeId) -> usize {
120 self.graph.degree_in(node_id)
121 }
122
123 pub fn node_degree_out(&self, node_id: GraphNodeId) -> usize {
125 self.graph.degree_out(node_id)
126 }
127
128 pub fn node_successors(
130 &self,
131 src: GraphNodeId,
132 ) -> impl '_
133 + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
134 + ExactSizeIterator
135 + FusedIterator
136 + Clone
137 + Debug {
138 self.graph.successors(src)
139 }
140
141 pub fn node_predecessors(
143 &self,
144 dst: GraphNodeId,
145 ) -> impl '_
146 + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
147 + ExactSizeIterator
148 + FusedIterator
149 + Clone
150 + Debug {
151 self.graph.predecessors(dst)
152 }
153
154 pub fn node_successor_edges(
156 &self,
157 src: GraphNodeId,
158 ) -> impl '_
159 + DoubleEndedIterator<Item = GraphEdgeId>
160 + ExactSizeIterator
161 + FusedIterator
162 + Clone
163 + Debug {
164 self.graph.successor_edges(src)
165 }
166
167 pub fn node_predecessor_edges(
169 &self,
170 dst: GraphNodeId,
171 ) -> impl '_
172 + DoubleEndedIterator<Item = GraphEdgeId>
173 + ExactSizeIterator
174 + FusedIterator
175 + Clone
176 + Debug {
177 self.graph.predecessor_edges(dst)
178 }
179
180 pub fn node_successor_nodes(
182 &self,
183 src: GraphNodeId,
184 ) -> impl '_
185 + DoubleEndedIterator<Item = GraphNodeId>
186 + ExactSizeIterator
187 + FusedIterator
188 + Clone
189 + Debug {
190 self.graph.successor_vertices(src)
191 }
192
193 pub fn node_predecessor_nodes(
195 &self,
196 dst: GraphNodeId,
197 ) -> impl '_
198 + DoubleEndedIterator<Item = GraphNodeId>
199 + ExactSizeIterator
200 + FusedIterator
201 + Clone
202 + Debug {
203 self.graph.predecessor_vertices(dst)
204 }
205
206 pub fn node_ids(&self) -> slotmap::basic::Keys<'_, GraphNodeId, GraphNode> {
208 self.nodes.keys()
209 }
210
211 pub fn nodes(&self) -> slotmap::basic::Iter<'_, GraphNodeId, GraphNode> {
213 self.nodes.iter()
214 }
215
216 pub fn insert_node(
218 &mut self,
219 node: GraphNode,
220 varname_opt: Option<Ident>,
221 loop_opt: Option<GraphLoopId>,
222 ) -> GraphNodeId {
223 let node_id = self.nodes.insert(node);
224 if let Some(varname) = varname_opt {
225 self.node_varnames.insert(node_id, Varname(varname));
226 }
227 if let Some(loop_id) = loop_opt {
228 self.node_loops.insert(node_id, loop_id);
229 self.loop_nodes[loop_id].push(node_id);
230 }
231 node_id
232 }
233
234 pub fn insert_node_op_inst(&mut self, node_id: GraphNodeId, op_inst: OperatorInstance) {
236 assert!(matches!(
237 self.nodes.get(node_id),
238 Some(GraphNode::Operator(_))
239 ));
240 let old_inst = self.operator_instances.insert(node_id, op_inst);
241 assert!(old_inst.is_none());
242 }
243
244 pub fn insert_node_op_insts_all(&mut self, diagnostics: &mut Diagnostics) {
246 let mut op_insts = Vec::new();
247 for (node_id, node) in self.nodes() {
248 let GraphNode::Operator(operator) = node else {
249 continue;
250 };
251 if self.node_op_inst(node_id).is_some() {
252 continue;
253 };
254
255 let Some(op_constraints) = find_op_op_constraints(operator) else {
257 diagnostics.push(Diagnostic::spanned(
258 operator.path.span(),
259 Level::Error,
260 format!("Unknown operator `{}`", operator.name_string()),
261 ));
262 continue;
263 };
264
265 let (input_ports, output_ports) = {
267 let mut input_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
268 .node_predecessors(node_id)
269 .map(|(edge_id, pred_id)| (self.edge_ports(edge_id).1, pred_id))
270 .collect();
271 input_edges.sort();
273 let input_ports: Vec<PortIndexValue> = input_edges
274 .into_iter()
275 .map(|(port, _pred)| port)
276 .cloned()
277 .collect();
278
279 let mut output_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
281 .node_successors(node_id)
282 .map(|(edge_id, succ)| (self.edge_ports(edge_id).0, succ))
283 .collect();
284 output_edges.sort();
286 let output_ports: Vec<PortIndexValue> = output_edges
287 .into_iter()
288 .map(|(port, _succ)| port)
289 .cloned()
290 .collect();
291
292 (input_ports, output_ports)
293 };
294
295 let generics = get_operator_generics(diagnostics, operator);
297 {
299 let generics_span = generics
301 .generic_args
302 .as_ref()
303 .map(Spanned::span)
304 .unwrap_or_else(|| operator.path.span());
305
306 if !op_constraints
307 .persistence_args
308 .contains(&generics.persistence_args.len())
309 {
310 diagnostics.push(Diagnostic::spanned(
311 generics.persistence_args_span().unwrap_or(generics_span),
312 Level::Error,
313 format!(
314 "`{}` should have {} persistence lifetime arguments, actually has {}.",
315 op_constraints.name,
316 op_constraints.persistence_args.human_string(),
317 generics.persistence_args.len()
318 ),
319 ));
320 }
321 if !op_constraints.type_args.contains(&generics.type_args.len()) {
322 diagnostics.push(Diagnostic::spanned(
323 generics.type_args_span().unwrap_or(generics_span),
324 Level::Error,
325 format!(
326 "`{}` should have {} generic type arguments, actually has {}.",
327 op_constraints.name,
328 op_constraints.type_args.human_string(),
329 generics.type_args.len()
330 ),
331 ));
332 }
333 }
334
335 op_insts.push((
336 node_id,
337 OperatorInstance {
338 op_constraints,
339 input_ports,
340 output_ports,
341 singletons_referenced: operator.singletons_referenced.clone(),
342 generics,
343 arguments_pre: operator.args.clone(),
344 arguments_raw: operator.args_raw.clone(),
345 },
346 ));
347 }
348
349 for (node_id, op_inst) in op_insts {
350 self.insert_node_op_inst(node_id, op_inst);
351 }
352 }
353
354 pub fn insert_intermediate_node(
366 &mut self,
367 edge_id: GraphEdgeId,
368 new_node: GraphNode,
369 ) -> (GraphNodeId, GraphEdgeId) {
370 let span = Some(new_node.span());
371
372 let op_inst_opt = 'oc: {
374 let GraphNode::Operator(operator) = &new_node else {
375 break 'oc None;
376 };
377 let Some(op_constraints) = find_op_op_constraints(operator) else {
378 break 'oc None;
379 };
380 let (input_port, output_port) = self.ports.get(edge_id).cloned().unwrap();
381
382 let mut dummy_diagnostics = Diagnostics::new();
383 let generics = get_operator_generics(&mut dummy_diagnostics, operator);
384 assert!(dummy_diagnostics.is_empty());
385
386 Some(OperatorInstance {
387 op_constraints,
388 input_ports: vec![input_port],
389 output_ports: vec![output_port],
390 singletons_referenced: operator.singletons_referenced.clone(),
391 generics,
392 arguments_pre: operator.args.clone(),
393 arguments_raw: operator.args_raw.clone(),
394 })
395 };
396
397 let node_id = self.nodes.insert(new_node);
399 if let Some(op_inst) = op_inst_opt {
401 self.operator_instances.insert(node_id, op_inst);
402 }
403 let (e0, e1) = self
405 .graph
406 .insert_intermediate_vertex(node_id, edge_id)
407 .unwrap();
408
409 let (src_idx, dst_idx) = self.ports.remove(edge_id).unwrap();
411 self.ports
412 .insert(e0, (src_idx, PortIndexValue::Elided(span)));
413 self.ports
414 .insert(e1, (PortIndexValue::Elided(span), dst_idx));
415
416 (node_id, e1)
417 }
418
419 pub fn remove_intermediate_node(&mut self, node_id: GraphNodeId) {
422 assert_eq!(
423 1,
424 self.node_degree_in(node_id),
425 "Removed intermediate node must have one predecessor"
426 );
427 assert_eq!(
428 1,
429 self.node_degree_out(node_id),
430 "Removed intermediate node must have one successor"
431 );
432 assert!(
433 self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
434 "Should not remove intermediate node after subgraph partitioning"
435 );
436
437 assert!(self.nodes.remove(node_id).is_some());
438 let (new_edge_id, (pred_edge_id, succ_edge_id)) =
439 self.graph.remove_intermediate_vertex(node_id).unwrap();
440 self.operator_instances.remove(node_id);
441 self.node_varnames.remove(node_id);
442
443 let (src_port, _) = self.ports.remove(pred_edge_id).unwrap();
444 let (_, dst_port) = self.ports.remove(succ_edge_id).unwrap();
445 self.ports.insert(new_edge_id, (src_port, dst_port));
446 }
447
448 pub(crate) fn node_color(&self, node_id: GraphNodeId) -> Option<Color> {
454 if matches!(self.node(node_id), GraphNode::Handoff { .. }) {
455 return Some(Color::Hoff);
456 }
457
458 if let GraphNode::Operator(op) = self.node(node_id)
460 && (op.name_string() == "resolve_futures_blocking"
461 || op.name_string() == "resolve_futures_blocking_ordered")
462 {
463 return Some(Color::Push);
464 }
465
466 let inn_degree = self.node_predecessor_nodes(node_id).count();
468 let out_degree = self.node_successor_nodes(node_id).count();
470
471 match (inn_degree, out_degree) {
472 (0, 0) => None, (0, 1) => Some(Color::Pull),
474 (1, 0) => Some(Color::Push),
475 (1, 1) => None, (_many, 0 | 1) => Some(Color::Pull),
477 (0 | 1, _many) => Some(Color::Push),
478 (_many, _to_many) => Some(Color::Comp),
479 }
480 }
481
482 pub fn set_operator_tag(&mut self, node_id: GraphNodeId, tag: String) {
484 self.operator_tag.insert(node_id, tag);
485 }
486}
487
488impl DfirGraph {
490 pub fn set_node_singleton_references(
493 &mut self,
494 node_id: GraphNodeId,
495 singletons_referenced: Vec<Option<GraphNodeId>>,
496 ) -> Option<Vec<Option<GraphNodeId>>> {
497 self.node_singleton_references
498 .insert(node_id, singletons_referenced)
499 }
500
501 pub fn node_singleton_references(&self, node_id: GraphNodeId) -> &[Option<GraphNodeId>] {
504 self.node_singleton_references
505 .get(node_id)
506 .map(std::ops::Deref::deref)
507 .unwrap_or_default()
508 }
509}
510
511impl DfirGraph {
513 pub fn merge_modules(&mut self) -> Result<(), Diagnostic> {
521 let mod_bound_nodes = self
522 .nodes()
523 .filter(|(_nid, node)| matches!(node, GraphNode::ModuleBoundary { .. }))
524 .map(|(nid, _node)| nid)
525 .collect::<Vec<_>>();
526
527 for mod_bound_node in mod_bound_nodes {
528 self.remove_module_boundary(mod_bound_node)?;
529 }
530
531 Ok(())
532 }
533
534 fn remove_module_boundary(&mut self, mod_bound_node: GraphNodeId) -> Result<(), Diagnostic> {
538 assert!(
539 self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
540 "Should not remove intermediate node after subgraph partitioning"
541 );
542
543 let mut mod_pred_ports = BTreeMap::new();
544 let mut mod_succ_ports = BTreeMap::new();
545
546 for mod_out_edge in self.node_predecessor_edges(mod_bound_node) {
547 let (pred_port, succ_port) = self.edge_ports(mod_out_edge);
548 mod_pred_ports.insert(succ_port.clone(), (mod_out_edge, pred_port.clone()));
549 }
550
551 for mod_inn_edge in self.node_successor_edges(mod_bound_node) {
552 let (pred_port, succ_port) = self.edge_ports(mod_inn_edge);
553 mod_succ_ports.insert(pred_port.clone(), (mod_inn_edge, succ_port.clone()));
554 }
555
556 if mod_pred_ports.keys().collect::<BTreeSet<_>>()
557 != mod_succ_ports.keys().collect::<BTreeSet<_>>()
558 {
559 let GraphNode::ModuleBoundary { input, import_expr } = self.node(mod_bound_node) else {
561 panic!();
562 };
563
564 if *input {
565 return Err(Diagnostic {
566 span: *import_expr,
567 level: Level::Error,
568 message: format!(
569 "The ports into the module did not match. input: {:?}, expected: {:?}",
570 mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
571 mod_succ_ports.keys().map(|x| x.to_string()).join(", ")
572 ),
573 });
574 } else {
575 return Err(Diagnostic {
576 span: *import_expr,
577 level: Level::Error,
578 message: format!(
579 "The ports out of the module did not match. output: {:?}, expected: {:?}",
580 mod_succ_ports.keys().map(|x| x.to_string()).join(", "),
581 mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
582 ),
583 });
584 }
585 }
586
587 for (port, (pred_edge, pred_port)) in mod_pred_ports {
588 let (succ_edge, succ_port) = mod_succ_ports.remove(&port).unwrap();
589
590 let (src, _) = self.edge(pred_edge);
591 let (_, dst) = self.edge(succ_edge);
592 self.remove_edge(pred_edge);
593 self.remove_edge(succ_edge);
594
595 let new_edge_id = self.graph.insert_edge(src, dst);
596 self.ports.insert(new_edge_id, (pred_port, succ_port));
597 }
598
599 self.graph.remove_vertex(mod_bound_node);
600 self.nodes.remove(mod_bound_node);
601
602 Ok(())
603 }
604}
605
606impl DfirGraph {
608 pub fn edge(&self, edge_id: GraphEdgeId) -> (GraphNodeId, GraphNodeId) {
610 let (src, dst) = self.graph.edge(edge_id).expect("Edge not found.");
611 (src, dst)
612 }
613
614 pub fn edge_ports(&self, edge_id: GraphEdgeId) -> (&PortIndexValue, &PortIndexValue) {
616 let (src_port, dst_port) = self.ports.get(edge_id).expect("Edge not found.");
617 (src_port, dst_port)
618 }
619
620 pub fn edge_ids(&self) -> slotmap::basic::Keys<'_, GraphEdgeId, (GraphNodeId, GraphNodeId)> {
622 self.graph.edge_ids()
623 }
624
625 pub fn edges(
627 &self,
628 ) -> impl '_
629 + ExactSizeIterator<Item = (GraphEdgeId, (GraphNodeId, GraphNodeId))>
630 + FusedIterator
631 + Clone
632 + Debug {
633 self.graph.edges()
634 }
635
636 pub fn insert_edge(
638 &mut self,
639 src: GraphNodeId,
640 src_port: PortIndexValue,
641 dst: GraphNodeId,
642 dst_port: PortIndexValue,
643 ) -> GraphEdgeId {
644 let edge_id = self.graph.insert_edge(src, dst);
645 self.ports.insert(edge_id, (src_port, dst_port));
646 edge_id
647 }
648
649 pub fn remove_edge(&mut self, edge: GraphEdgeId) {
651 let (_src, _dst) = self.graph.remove_edge(edge).unwrap();
652 let (_src_port, _dst_port) = self.ports.remove(edge).unwrap();
653 }
654}
655
656impl DfirGraph {
658 pub fn subgraph(&self, subgraph_id: GraphSubgraphId) -> &Vec<GraphNodeId> {
660 self.subgraph_nodes
661 .get(subgraph_id)
662 .expect("Subgraph not found.")
663 }
664
665 pub fn subgraph_ids(&self) -> slotmap::basic::Keys<'_, GraphSubgraphId, Vec<GraphNodeId>> {
667 self.subgraph_nodes.keys()
668 }
669
670 pub fn subgraphs(&self) -> slotmap::basic::Iter<'_, GraphSubgraphId, Vec<GraphNodeId>> {
672 self.subgraph_nodes.iter()
673 }
674
675 pub fn insert_subgraph(
677 &mut self,
678 node_ids: Vec<GraphNodeId>,
679 ) -> Result<GraphSubgraphId, (GraphNodeId, GraphSubgraphId)> {
680 for &node_id in node_ids.iter() {
682 if let Some(&old_sg_id) = self.node_subgraph.get(node_id) {
683 return Err((node_id, old_sg_id));
684 }
685 }
686 let subgraph_id = self.subgraph_nodes.insert_with_key(|sg_id| {
687 for &node_id in node_ids.iter() {
688 self.node_subgraph.insert(node_id, sg_id);
689 }
690 node_ids
691 });
692
693 Ok(subgraph_id)
694 }
695
696 pub fn remove_from_subgraph(&mut self, node_id: GraphNodeId) -> bool {
698 if let Some(old_sg_id) = self.node_subgraph.remove(node_id) {
699 self.subgraph_nodes[old_sg_id].retain(|&other_node_id| other_node_id != node_id);
700 true
701 } else {
702 false
703 }
704 }
705
706 pub fn subgraph_stratum(&self, sg_id: GraphSubgraphId) -> Option<usize> {
708 self.subgraph_stratum.get(sg_id).copied()
709 }
710
711 pub fn set_subgraph_stratum(
713 &mut self,
714 sg_id: GraphSubgraphId,
715 stratum: usize,
716 ) -> Option<usize> {
717 self.subgraph_stratum.insert(sg_id, stratum)
718 }
719
720 fn subgraph_laziness(&self, sg_id: GraphSubgraphId) -> bool {
722 self.subgraph_laziness.get(sg_id).copied().unwrap_or(false)
723 }
724
725 pub fn set_subgraph_laziness(&mut self, sg_id: GraphSubgraphId, lazy: bool) -> bool {
727 self.subgraph_laziness.insert(sg_id, lazy).unwrap_or(false)
728 }
729
730 pub fn max_stratum(&self) -> Option<usize> {
732 self.subgraph_stratum.values().copied().max()
733 }
734
735 fn find_pull_to_push_idx(&self, subgraph_nodes: &[GraphNodeId]) -> usize {
737 subgraph_nodes
738 .iter()
739 .position(|&node_id| {
740 self.node_color(node_id)
741 .is_some_and(|color| Color::Pull != color)
742 })
743 .unwrap_or(subgraph_nodes.len())
744 }
745}
746
747impl DfirGraph {
749 fn node_as_ident(&self, node_id: GraphNodeId, is_pred: bool) -> Ident {
751 let name = match &self.nodes[node_id] {
752 GraphNode::Operator(_) => format!("op_{:?}", node_id.data()),
753 GraphNode::Handoff { .. } => format!(
754 "hoff_{:?}_{}",
755 node_id.data(),
756 if is_pred { "recv" } else { "send" }
757 ),
758 GraphNode::ModuleBoundary { .. } => panic!(),
759 };
760 let span = match (is_pred, &self.nodes[node_id]) {
761 (_, GraphNode::Operator(operator)) => operator.span(),
762 (true, &GraphNode::Handoff { src_span, .. }) => src_span,
763 (false, &GraphNode::Handoff { dst_span, .. }) => dst_span,
764 (_, GraphNode::ModuleBoundary { .. }) => panic!(),
765 };
766 Ident::new(&name, span)
767 }
768
769 fn node_as_singleton_ident(&self, node_id: GraphNodeId, span: Span) -> Ident {
771 Ident::new(&format!("singleton_op_{:?}", node_id.data()), span)
772 }
773
774 fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<Ident> {
776 self.node_singleton_references(node_id)
777 .iter()
778 .map(|singleton_node_id| {
779 self.node_as_singleton_ident(
781 singleton_node_id
782 .expect("Expected singleton to be resolved but was not, this is a bug."),
783 span,
784 )
785 })
786 .collect::<Vec<_>>()
787 }
788
789 fn helper_collect_subgraph_handoffs(
792 &self,
793 ) -> SecondaryMap<GraphSubgraphId, (Vec<GraphNodeId>, Vec<GraphNodeId>)> {
794 let mut subgraph_handoffs: SecondaryMap<
796 GraphSubgraphId,
797 (Vec<GraphNodeId>, Vec<GraphNodeId>),
798 > = self
799 .subgraph_nodes
800 .keys()
801 .map(|k| (k, Default::default()))
802 .collect();
803
804 for (hoff_id, node) in self.nodes() {
806 if !matches!(node, GraphNode::Handoff { .. }) {
807 continue;
808 }
809 for (_edge, succ_id) in self.node_successors(hoff_id) {
811 let succ_sg = self.node_subgraph(succ_id).unwrap();
812 subgraph_handoffs[succ_sg].0.push(hoff_id);
813 }
814 for (_edge, pred_id) in self.node_predecessors(hoff_id) {
816 let pred_sg = self.node_subgraph(pred_id).unwrap();
817 subgraph_handoffs[pred_sg].1.push(hoff_id);
818 }
819 }
820
821 subgraph_handoffs
822 }
823
824 fn codegen_nested_loops(&self, df: &Ident) -> TokenStream {
826 let mut out = TokenStream::new();
828 let mut queue = VecDeque::from_iter(self.root_loops.iter().copied());
829 while let Some(loop_id) = queue.pop_front() {
830 let parent_opt = self
831 .loop_parent(loop_id)
832 .map(|loop_id| loop_id.as_ident(Span::call_site()))
833 .map(|ident| quote! { Some(#ident) })
834 .unwrap_or_else(|| quote! { None });
835 let loop_name = loop_id.as_ident(Span::call_site());
836 out.append_all(quote! {
837 let #loop_name = #df.add_loop(#parent_opt);
838 });
839 queue.extend(self.loop_children.get(loop_id).into_iter().flatten());
840 }
841 out
842 }
843
844 pub fn as_code(
851 &self,
852 root: &TokenStream,
853 include_type_guards: bool,
854 prefix: TokenStream,
855 diagnostics: &mut Diagnostics,
856 ) -> Result<TokenStream, Diagnostics> {
857 let df = Ident::new(GRAPH, Span::call_site());
858 let context = Ident::new(CONTEXT, Span::call_site());
859
860 let handoff_code = self
862 .nodes
863 .iter()
864 .filter_map(|(node_id, node)| match node {
865 GraphNode::Operator(_) => None,
866 &GraphNode::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))),
867 GraphNode::ModuleBoundary { .. } => panic!(),
868 })
869 .map(|(node_id, (src_span, dst_span))| {
870 let ident_send = Ident::new(&format!("hoff_{:?}_send", node_id.data()), dst_span);
871 let ident_recv = Ident::new(&format!("hoff_{:?}_recv", node_id.data()), src_span);
872 let span = src_span.join(dst_span).unwrap_or(src_span);
873 let mut hoff_name = Literal::string(&format!("handoff {:?}", node_id));
874 hoff_name.set_span(span);
875 let hoff_type = quote_spanned! (span=> #root::scheduled::handoff::VecHandoff<_>);
876 quote_spanned! {span=>
877 let (#ident_send, #ident_recv) =
878 #df.make_edge::<_, #hoff_type>(#hoff_name);
879 }
880 });
881
882 let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
883
884 let (subgraphs_without_preds, subgraphs_with_preds) = self
886 .subgraph_nodes
887 .iter()
888 .partition::<Vec<_>, _>(|(_, nodes)| {
889 nodes
890 .iter()
891 .any(|&node_id| self.node_degree_in(node_id) == 0)
892 });
893
894 let mut op_prologue_code = Vec::new();
895 let mut op_prologue_after_code = Vec::new();
896 let mut subgraphs = Vec::new();
897 {
898 for &(subgraph_id, subgraph_nodes) in subgraphs_without_preds
899 .iter()
900 .chain(subgraphs_with_preds.iter())
901 {
902 let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
903 let recv_ports: Vec<Ident> = recv_hoffs
904 .iter()
905 .map(|&hoff_id| self.node_as_ident(hoff_id, true))
906 .collect();
907 let send_ports: Vec<Ident> = send_hoffs
908 .iter()
909 .map(|&hoff_id| self.node_as_ident(hoff_id, false))
910 .collect();
911
912 let recv_port_code = recv_ports.iter().map(|ident| {
913 quote_spanned! {ident.span()=>
914 let mut #ident = #ident.borrow_mut_swap();
915 let #ident = #root::dfir_pipes::pull::iter(#ident.drain(..));
916 }
917 });
918 let send_port_code = send_ports.iter().map(|ident| {
919 quote_spanned! {ident.span()=>
920 let mut #ident = #ident.borrow_mut_give();
921 let #ident = #root::dfir_pipes::push::vec_push(&mut *#ident);
922 }
923 });
924
925 let loop_id = self
926 .node_loop(subgraph_nodes[0]);
928
929 let mut subgraph_op_iter_code = Vec::new();
930 let mut subgraph_op_iter_after_code = Vec::new();
931 {
932 let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
933
934 let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
935 let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
936
937 for (idx, &node_id) in nodes_iter.enumerate() {
938 let node = &self.nodes[node_id];
939 assert!(
940 matches!(node, GraphNode::Operator(_)),
941 "Handoffs are not part of subgraphs."
942 );
943 let op_inst = &self.operator_instances[node_id];
944
945 let op_span = node.span();
946 let op_name = op_inst.op_constraints.name;
947 let root = change_spans(root.clone(), op_span);
949 let op_constraints = OPERATORS
951 .iter()
952 .find(|op| op_name == op.name)
953 .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
954
955 let ident = self.node_as_ident(node_id, false);
956
957 {
958 let mut input_edges = self
961 .graph
962 .predecessor_edges(node_id)
963 .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
964 .collect::<Vec<_>>();
965 input_edges.sort();
967
968 let inputs = input_edges
969 .iter()
970 .map(|&(_port, edge_id)| {
971 let (pred, _) = self.edge(edge_id);
972 self.node_as_ident(pred, true)
973 })
974 .collect::<Vec<_>>();
975
976 let mut output_edges = self
978 .graph
979 .successor_edges(node_id)
980 .map(|edge_id| (&self.ports[edge_id].0, edge_id))
981 .collect::<Vec<_>>();
982 output_edges.sort();
984
985 let outputs = output_edges
986 .iter()
987 .map(|&(_port, edge_id)| {
988 let (_, succ) = self.edge(edge_id);
989 self.node_as_ident(succ, false)
990 })
991 .collect::<Vec<_>>();
992
993 let is_pull = idx < pull_to_push_idx;
994
995 let singleton_output_ident = &if op_constraints.has_singleton_output {
996 self.node_as_singleton_ident(node_id, op_span)
997 } else {
998 Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
1000 };
1001
1002 let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1011 let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1012
1013 let singletons_resolved =
1014 self.helper_resolve_singletons(node_id, op_span);
1015 let arguments = &process_singletons::postprocess_singletons(
1016 op_inst.arguments_raw.clone(),
1017 singletons_resolved.clone(),
1018 context,
1019 );
1020 let arguments_handles =
1021 &process_singletons::postprocess_singletons_handles(
1022 op_inst.arguments_raw.clone(),
1023 singletons_resolved.clone(),
1024 );
1025
1026 let source_tag = 'a: {
1027 if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1028 break 'a tag;
1029 }
1030
1031 #[cfg(nightly)]
1032 if proc_macro::is_available() {
1033 let op_span = op_span.unwrap();
1034 break 'a format!(
1035 "loc_{}_{}_{}_{}_{}",
1036 crate::pretty_span::make_source_path_relative(
1037 &op_span.file()
1038 )
1039 .display()
1040 .to_string()
1041 .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1042 op_span.start().line(),
1043 op_span.start().column(),
1044 op_span.end().line(),
1045 op_span.end().column(),
1046 );
1047 }
1048
1049 format!(
1050 "loc_nopath_{}_{}_{}_{}",
1051 op_span.start().line,
1052 op_span.start().column,
1053 op_span.end().line,
1054 op_span.end().column
1055 )
1056 };
1057
1058 let work_fn = format_ident!(
1059 "{}__{}__{}",
1060 ident,
1061 op_name,
1062 source_tag,
1063 span = op_span
1064 );
1065 let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1066
1067 let context_args = WriteContextArgs {
1068 root: &root,
1069 df_ident: df_local,
1070 context,
1071 subgraph_id,
1072 node_id,
1073 loop_id,
1074 op_span,
1075 op_tag: self.operator_tag.get(node_id).cloned(),
1076 work_fn: &work_fn,
1077 work_fn_async: &work_fn_async,
1078 ident: &ident,
1079 is_pull,
1080 inputs: &inputs,
1081 outputs: &outputs,
1082 singleton_output_ident,
1083 op_name,
1084 op_inst,
1085 arguments,
1086 arguments_handles,
1087 };
1088
1089 let write_result =
1090 (op_constraints.write_fn)(&context_args, diagnostics);
1091 let OperatorWriteOutput {
1092 write_prologue,
1093 write_prologue_after,
1094 write_iterator,
1095 write_iterator_after,
1096 } = write_result.unwrap_or_else(|()| {
1097 assert!(
1098 diagnostics.has_error(),
1099 "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1100 op_name,
1101 );
1102 OperatorWriteOutput { write_iterator: null_write_iterator_fn(&context_args), ..Default::default() }
1103 });
1104
1105 op_prologue_code.push(syn::parse_quote! {
1106 #[allow(non_snake_case)]
1107 #[inline(always)]
1108 fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1109 thunk()
1110 }
1111
1112 #[allow(non_snake_case)]
1113 #[inline(always)]
1114 async fn #work_fn_async<T>(thunk: impl ::std::future::Future<Output = T>) -> T {
1115 thunk.await
1116 }
1117 });
1118 op_prologue_code.push(write_prologue);
1119 op_prologue_after_code.push(write_prologue_after);
1120 subgraph_op_iter_code.push(write_iterator);
1121
1122 if include_type_guards {
1123 let type_guard = if is_pull {
1124 quote_spanned! {op_span=>
1125 let #ident = {
1126 #[allow(non_snake_case)]
1127 #[inline(always)]
1128 pub fn #work_fn<Item, Input>(input: Input)
1129 -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1130 where
1131 Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1132 {
1133 #root::pin_project_lite::pin_project! {
1134 #[repr(transparent)]
1135 struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1136 #[pin]
1137 inner: Input
1138 }
1139 }
1140
1141 impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1142 where
1143 Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1144 {
1145 type Ctx<'ctx> = Input::Ctx<'ctx>;
1146
1147 type Item = Item;
1148 type Meta = Input::Meta;
1149 type CanPend = Input::CanPend;
1150 type CanEnd = Input::CanEnd;
1151
1152 #[inline(always)]
1153 fn pull(
1154 self: ::std::pin::Pin<&mut Self>,
1155 ctx: &mut Self::Ctx<'_>,
1156 ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1157 #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1158 }
1159
1160 #[inline(always)]
1161 fn size_hint(&self) -> (usize, Option<usize>) {
1162 #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1163 }
1164 }
1165
1166 Pull {
1167 inner: input
1168 }
1169 }
1170 #work_fn::<_, _>( #ident )
1171 };
1172 }
1173 } else {
1174 quote_spanned! {op_span=>
1175 let #ident = {
1176 #[allow(non_snake_case)]
1177 #[inline(always)]
1178 pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1179 where
1180 Psh: #root::dfir_pipes::push::Push<Item, ()>
1181 {
1182 #root::pin_project_lite::pin_project! {
1183 #[repr(transparent)]
1184 struct PushGuard<Psh> {
1185 #[pin]
1186 inner: Psh,
1187 }
1188 }
1189
1190 impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1191 where
1192 Psh: #root::dfir_pipes::push::Push<Item, ()>,
1193 {
1194 type Ctx<'ctx> = Psh::Ctx<'ctx>;
1195
1196 type CanPend = Psh::CanPend;
1197
1198 #[inline(always)]
1199 fn poll_ready(
1200 self: ::std::pin::Pin<&mut Self>,
1201 ctx: &mut Self::Ctx<'_>,
1202 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1203 #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1204 }
1205
1206 #[inline(always)]
1207 fn start_send(
1208 self: ::std::pin::Pin<&mut Self>,
1209 item: Item,
1210 meta: (),
1211 ) {
1212 #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1213 }
1214
1215 #[inline(always)]
1216 fn poll_flush(
1217 self: ::std::pin::Pin<&mut Self>,
1218 ctx: &mut Self::Ctx<'_>,
1219 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1220 #root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
1221 }
1222
1223 #[inline(always)]
1224 fn size_hint(
1225 self: ::std::pin::Pin<&mut Self>,
1226 hint: (usize, Option<usize>),
1227 ) {
1228 #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1229 }
1230 }
1231
1232 PushGuard {
1233 inner: psh
1234 }
1235 }
1236 #work_fn( #ident )
1237 };
1238 }
1239 };
1240 subgraph_op_iter_code.push(type_guard);
1241 }
1242 subgraph_op_iter_after_code.push(write_iterator_after);
1243 }
1244 }
1245
1246 {
1247 let pull_ident = if 0 < pull_to_push_idx {
1249 self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1250 } else {
1251 recv_ports[0].clone()
1253 };
1254
1255 #[rustfmt::skip]
1256 let push_ident = if let Some(&node_id) =
1257 subgraph_nodes.get(pull_to_push_idx)
1258 {
1259 self.node_as_ident(node_id, false)
1260 } else if 1 == send_ports.len() {
1261 send_ports[0].clone()
1263 } else {
1264 diagnostics.push(Diagnostic::spanned(
1265 pull_ident.span(),
1266 Level::Error,
1267 "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1268 ));
1269 continue;
1270 };
1271
1272 let pivot_span = pull_ident
1274 .span()
1275 .join(push_ident.span())
1276 .unwrap_or_else(|| push_ident.span());
1277 let pivot_fn_ident =
1278 Ident::new(&format!("pivot_run_sg_{:?}", subgraph_id.0), pivot_span);
1279 let root = change_spans(root.clone(), pivot_span);
1280 subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1281 #[inline(always)]
1282 fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1283 -> impl ::std::future::Future<Output = ()>
1284 where
1285 Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1286 Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1287 {
1288 #root::dfir_pipes::pull::Pull::send_push(pull, push)
1289 }
1290 (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1291 });
1292 }
1293 };
1294
1295 let subgraph_name = Literal::string(&format!("Subgraph {:?}", subgraph_id));
1296 let stratum = Literal::usize_unsuffixed(
1297 self.subgraph_stratum.get(subgraph_id).cloned().unwrap_or(0),
1298 );
1299 let laziness = self.subgraph_laziness(subgraph_id);
1300
1301 let loop_id_opt = loop_id
1303 .map(|loop_id| loop_id.as_ident(Span::call_site()))
1304 .map(|ident| quote! { Some(#ident) })
1305 .unwrap_or_else(|| quote! { None });
1306
1307 let sg_ident = subgraph_id.as_ident(Span::call_site());
1308
1309 subgraphs.push(quote! {
1310 let #sg_ident = #df.add_subgraph_full(
1311 #subgraph_name,
1312 #stratum,
1313 var_expr!( #( #recv_ports ),* ),
1314 var_expr!( #( #send_ports ),* ),
1315 #laziness,
1316 #loop_id_opt,
1317 async move |#context, var_args!( #( #recv_ports ),* ), var_args!( #( #send_ports ),* )| {
1318 #( #recv_port_code )*
1319 #( #send_port_code )*
1320 #( #subgraph_op_iter_code )*
1321 #( #subgraph_op_iter_after_code )*
1322 },
1323 );
1324 });
1325 }
1326 }
1327
1328 if diagnostics.has_error() {
1329 return Err(std::mem::take(diagnostics));
1330 }
1331 let _ = diagnostics; let loop_code = self.codegen_nested_loops(&df);
1334
1335 let code = quote! {
1340 #( #handoff_code )*
1341 #loop_code
1342 #( #op_prologue_code )*
1343 #( #subgraphs )*
1344 #( #op_prologue_after_code )*
1345 };
1346
1347 let meta_graph_json = serde_json::to_string(&self).unwrap();
1348 let meta_graph_json = Literal::string(&meta_graph_json);
1349
1350 let serde_diagnostics: Vec<_> = diagnostics.iter().map(Diagnostic::to_serde).collect();
1351 let diagnostics_json = serde_json::to_string(&*serde_diagnostics).unwrap();
1352 let diagnostics_json = Literal::string(&diagnostics_json);
1353
1354 Ok(quote! {
1355 {
1356 #[allow(unused_qualifications, clippy::await_holding_refcell_ref)]
1357 {
1358 #prefix
1359
1360 use #root::{var_expr, var_args};
1361
1362 let mut #df = #root::scheduled::graph::Dfir::new();
1363 #df.__assign_meta_graph(#meta_graph_json);
1364 #df.__assign_diagnostics(#diagnostics_json);
1365
1366 #code
1367
1368 #df
1369 }
1370 }
1371 })
1372 }
1373
1374 pub fn as_code_inline(
1386 &self,
1387 root: &TokenStream,
1388 include_type_guards: bool,
1389 prefix: TokenStream,
1390 diagnostics: &mut Diagnostics,
1391 ) -> Result<TokenStream, Diagnostics> {
1392 let df = Ident::new(GRAPH, Span::call_site());
1393 let context = Ident::new(CONTEXT, Span::call_site());
1394
1395 let buffer_code: Vec<TokenStream> = self
1397 .nodes
1398 .iter()
1399 .filter_map(|(node_id, node)| match node {
1400 GraphNode::Operator(_) => None,
1401 &GraphNode::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))),
1402 GraphNode::ModuleBoundary { .. } => panic!(),
1403 })
1404 .map(|(node_id, (src_span, dst_span))| {
1405 let span = src_span.join(dst_span).unwrap_or(src_span);
1406 let buf_ident = Ident::new(&format!("hoff_{:?}_buf", node_id.data()), span);
1407 quote_spanned! {span=>
1408 let mut #buf_ident: Vec<_> = Vec::new();
1409 }
1410 })
1411 .collect();
1412
1413 let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
1415
1416 let mut all_subgraphs: Vec<_> = self.subgraph_nodes.iter().collect();
1418 all_subgraphs.sort_by_key(|&(sg_id, nodes)| {
1419 let stratum = self.subgraph_stratum.get(sg_id).copied().unwrap_or(0);
1420 let is_source = nodes
1421 .iter()
1422 .any(|&node_id| self.node_degree_in(node_id) == 0);
1423 (stratum, !is_source)
1424 });
1425
1426 let mut op_prologue_code = Vec::new();
1427 let mut op_prologue_after_code = Vec::new();
1428 let mut subgraph_blocks = Vec::new();
1429 {
1430 for &(subgraph_id, subgraph_nodes) in all_subgraphs.iter() {
1431 let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
1432
1433 let recv_port_idents: Vec<Ident> = recv_hoffs
1435 .iter()
1436 .map(|&hoff_id| self.node_as_ident(hoff_id, true))
1437 .collect();
1438 let send_port_idents: Vec<Ident> = send_hoffs
1439 .iter()
1440 .map(|&hoff_id| self.node_as_ident(hoff_id, false))
1441 .collect();
1442
1443 let recv_buf_idents: Vec<Ident> = recv_hoffs
1445 .iter()
1446 .map(|&hoff_id| {
1447 let span = self.nodes[hoff_id].span();
1448 Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
1449 })
1450 .collect();
1451 let send_buf_idents: Vec<Ident> = send_hoffs
1452 .iter()
1453 .map(|&hoff_id| {
1454 let span = self.nodes[hoff_id].span();
1455 Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
1456 })
1457 .collect();
1458
1459 let recv_port_code: Vec<TokenStream> = recv_port_idents
1461 .iter()
1462 .zip(recv_buf_idents.iter())
1463 .map(|(port_ident, buf_ident)| {
1464 quote_spanned! {port_ident.span()=>
1465 let #port_ident = #root::dfir_pipes::pull::iter(#buf_ident.drain(..));
1466 }
1467 })
1468 .collect();
1469
1470 let send_port_code: Vec<TokenStream> = send_port_idents
1472 .iter()
1473 .zip(send_buf_idents.iter())
1474 .map(|(port_ident, buf_ident)| {
1475 quote_spanned! {port_ident.span()=>
1476 let #port_ident = #root::dfir_pipes::push::vec_push(&mut #buf_ident);
1477 }
1478 })
1479 .collect();
1480
1481 let loop_id = self.node_loop(subgraph_nodes[0]);
1483
1484 let mut subgraph_op_iter_code = Vec::new();
1485 let mut subgraph_op_iter_after_code = Vec::new();
1486 {
1487 let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
1488
1489 let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
1490 let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
1491
1492 for (idx, &node_id) in nodes_iter.enumerate() {
1493 let node = &self.nodes[node_id];
1494 assert!(
1495 matches!(node, GraphNode::Operator(_)),
1496 "Handoffs are not part of subgraphs."
1497 );
1498 let op_inst = &self.operator_instances[node_id];
1499
1500 let op_span = node.span();
1501 let op_name = op_inst.op_constraints.name;
1502 let root = change_spans(root.clone(), op_span);
1504 let op_constraints = OPERATORS
1505 .iter()
1506 .find(|op| op_name == op.name)
1507 .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
1508
1509 let ident = self.node_as_ident(node_id, false);
1510
1511 {
1512 let mut input_edges = self
1515 .graph
1516 .predecessor_edges(node_id)
1517 .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
1518 .collect::<Vec<_>>();
1519 input_edges.sort();
1521
1522 let inputs = input_edges
1523 .iter()
1524 .map(|&(_port, edge_id)| {
1525 let (pred, _) = self.edge(edge_id);
1526 self.node_as_ident(pred, true)
1527 })
1528 .collect::<Vec<_>>();
1529
1530 let mut output_edges = self
1532 .graph
1533 .successor_edges(node_id)
1534 .map(|edge_id| (&self.ports[edge_id].0, edge_id))
1535 .collect::<Vec<_>>();
1536 output_edges.sort();
1538
1539 let outputs = output_edges
1540 .iter()
1541 .map(|&(_port, edge_id)| {
1542 let (_, succ) = self.edge(edge_id);
1543 self.node_as_ident(succ, false)
1544 })
1545 .collect::<Vec<_>>();
1546
1547 let is_pull = idx < pull_to_push_idx;
1548
1549 let singleton_output_ident = &if op_constraints.has_singleton_output {
1550 self.node_as_singleton_ident(node_id, op_span)
1551 } else {
1552 Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
1554 };
1555
1556 let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1565 let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1566
1567 let singletons_resolved =
1568 self.helper_resolve_singletons(node_id, op_span);
1569 let arguments = &process_singletons::postprocess_singletons(
1570 op_inst.arguments_raw.clone(),
1571 singletons_resolved.clone(),
1572 context,
1573 );
1574 let arguments_handles =
1575 &process_singletons::postprocess_singletons_handles(
1576 op_inst.arguments_raw.clone(),
1577 singletons_resolved.clone(),
1578 );
1579
1580 let source_tag = 'a: {
1581 if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1582 break 'a tag;
1583 }
1584
1585 #[cfg(nightly)]
1586 if proc_macro::is_available() {
1587 let op_span = op_span.unwrap();
1588 break 'a format!(
1589 "loc_{}_{}_{}_{}_{}",
1590 crate::pretty_span::make_source_path_relative(
1591 &op_span.file()
1592 )
1593 .display()
1594 .to_string()
1595 .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1596 op_span.start().line(),
1597 op_span.start().column(),
1598 op_span.end().line(),
1599 op_span.end().column(),
1600 );
1601 }
1602
1603 format!(
1604 "loc_nopath_{}_{}_{}_{}",
1605 op_span.start().line,
1606 op_span.start().column,
1607 op_span.end().line,
1608 op_span.end().column
1609 )
1610 };
1611
1612 let work_fn = format_ident!(
1613 "{}__{}__{}",
1614 ident,
1615 op_name,
1616 source_tag,
1617 span = op_span
1618 );
1619 let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1620
1621 let context_args = WriteContextArgs {
1622 root: &root,
1623 df_ident: df_local,
1624 context,
1625 subgraph_id,
1626 node_id,
1627 loop_id,
1628 op_span,
1629 op_tag: self.operator_tag.get(node_id).cloned(),
1630 work_fn: &work_fn,
1631 work_fn_async: &work_fn_async,
1632 ident: &ident,
1633 is_pull,
1634 inputs: &inputs,
1635 outputs: &outputs,
1636 singleton_output_ident,
1637 op_name,
1638 op_inst,
1639 arguments,
1640 arguments_handles,
1641 };
1642
1643 let write_result =
1644 (op_constraints.write_fn)(&context_args, diagnostics);
1645 let OperatorWriteOutput {
1646 write_prologue,
1647 write_prologue_after,
1648 write_iterator,
1649 write_iterator_after,
1650 } = write_result.unwrap_or_else(|()| {
1651 assert!(
1652 diagnostics.has_error(),
1653 "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1654 op_name,
1655 );
1656 OperatorWriteOutput {
1657 write_iterator: null_write_iterator_fn(&context_args),
1658 ..Default::default()
1659 }
1660 });
1661
1662 op_prologue_code.push(syn::parse_quote! {
1663 #[allow(non_snake_case)]
1664 #[inline(always)]
1665 fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1666 thunk()
1667 }
1668
1669 #[allow(non_snake_case)]
1670 #[inline(always)]
1671 async fn #work_fn_async<T>(
1672 thunk: impl ::std::future::Future<Output = T>,
1673 ) -> T {
1674 thunk.await
1675 }
1676 });
1677 op_prologue_code.push(write_prologue);
1678 op_prologue_after_code.push(write_prologue_after);
1679 subgraph_op_iter_code.push(write_iterator);
1680
1681 if include_type_guards {
1682 let type_guard = if is_pull {
1683 quote_spanned! {op_span=>
1684 let #ident = {
1685 #[allow(non_snake_case)]
1686 #[inline(always)]
1687 pub fn #work_fn<Item, Input>(input: Input)
1688 -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1689 where
1690 Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1691 {
1692 #root::pin_project_lite::pin_project! {
1693 #[repr(transparent)]
1694 struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1695 #[pin]
1696 inner: Input
1697 }
1698 }
1699
1700 impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1701 where
1702 Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1703 {
1704 type Ctx<'ctx> = Input::Ctx<'ctx>;
1705
1706 type Item = Item;
1707 type Meta = Input::Meta;
1708 type CanPend = Input::CanPend;
1709 type CanEnd = Input::CanEnd;
1710
1711 #[inline(always)]
1712 fn pull(
1713 self: ::std::pin::Pin<&mut Self>,
1714 ctx: &mut Self::Ctx<'_>,
1715 ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1716 #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1717 }
1718
1719 #[inline(always)]
1720 fn size_hint(&self) -> (usize, Option<usize>) {
1721 #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1722 }
1723 }
1724
1725 Pull {
1726 inner: input
1727 }
1728 }
1729 #work_fn::<_, _>( #ident )
1730 };
1731 }
1732 } else {
1733 quote_spanned! {op_span=>
1734 let #ident = {
1735 #[allow(non_snake_case)]
1736 #[inline(always)]
1737 pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1738 where
1739 Psh: #root::dfir_pipes::push::Push<Item, ()>
1740 {
1741 #root::pin_project_lite::pin_project! {
1742 #[repr(transparent)]
1743 struct PushGuard<Psh> {
1744 #[pin]
1745 inner: Psh,
1746 }
1747 }
1748
1749 impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1750 where
1751 Psh: #root::dfir_pipes::push::Push<Item, ()>,
1752 {
1753 type Ctx<'ctx> = Psh::Ctx<'ctx>;
1754
1755 type CanPend = Psh::CanPend;
1756
1757 #[inline(always)]
1758 fn poll_ready(
1759 self: ::std::pin::Pin<&mut Self>,
1760 ctx: &mut Self::Ctx<'_>,
1761 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1762 #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1763 }
1764
1765 #[inline(always)]
1766 fn start_send(
1767 self: ::std::pin::Pin<&mut Self>,
1768 item: Item,
1769 meta: (),
1770 ) {
1771 #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1772 }
1773
1774 #[inline(always)]
1775 fn poll_flush(
1776 self: ::std::pin::Pin<&mut Self>,
1777 ctx: &mut Self::Ctx<'_>,
1778 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1779 #root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
1780 }
1781
1782 #[inline(always)]
1783 fn size_hint(
1784 self: ::std::pin::Pin<&mut Self>,
1785 hint: (usize, Option<usize>),
1786 ) {
1787 #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1788 }
1789 }
1790
1791 PushGuard {
1792 inner: psh
1793 }
1794 }
1795 #work_fn( #ident )
1796 };
1797 }
1798 };
1799 subgraph_op_iter_code.push(type_guard);
1800 }
1801 subgraph_op_iter_after_code.push(write_iterator_after);
1802 }
1803 }
1804
1805 {
1806 let pull_ident = if 0 < pull_to_push_idx {
1808 self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1809 } else {
1810 recv_port_idents[0].clone()
1812 };
1813
1814 #[rustfmt::skip]
1815 let push_ident = if let Some(&node_id) =
1816 subgraph_nodes.get(pull_to_push_idx)
1817 {
1818 self.node_as_ident(node_id, false)
1819 } else if 1 == send_port_idents.len() {
1820 send_port_idents[0].clone()
1822 } else {
1823 diagnostics.push(Diagnostic::spanned(
1824 pull_ident.span(),
1825 Level::Error,
1826 "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1827 ));
1828 continue;
1829 };
1830
1831 let pivot_span = pull_ident
1833 .span()
1834 .join(push_ident.span())
1835 .unwrap_or_else(|| push_ident.span());
1836 let pivot_fn_ident =
1837 Ident::new(&format!("pivot_run_sg_{:?}", subgraph_id.0), pivot_span);
1838 let root = change_spans(root.clone(), pivot_span);
1839 subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1840 #[inline(always)]
1841 fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1842 -> impl ::std::future::Future<Output = ()>
1843 where
1844 Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1845 Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1846 {
1847 #root::dfir_pipes::pull::Pull::send_push(pull, push)
1848 }
1849 (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1850 });
1851 }
1852 };
1853
1854 let sg_ident = subgraph_id.as_ident(Span::call_site());
1856
1857 subgraph_blocks.push(quote! {
1859 let #sg_ident: #root::scheduled::SubgraphId = #root::util::slot_vec::Key::from_raw(0);
1860 {
1861 let #context = &#df;;
1862 #( #recv_port_code )*
1863 #( #send_port_code )*
1864 #( #subgraph_op_iter_code )*
1865 #( #subgraph_op_iter_after_code )*
1866 }
1867 });
1868
1869 }
1872 }
1873
1874 if diagnostics.has_error() {
1875 return Err(std::mem::take(diagnostics));
1876 }
1877 let _ = diagnostics; Ok(quote! {
1881 {
1882 #prefix
1883
1884 use #root::{var_expr, var_args};
1885
1886 #[allow(unused_mut)]
1887 let mut #df = #root::scheduled::context::InlineContext::new();
1888
1889 #( #buffer_code )*
1890 #( #op_prologue_code )*
1891 #( #op_prologue_after_code )*
1892
1893 #[allow(unused_qualifications, unused_mut, unused_variables, clippy::await_holding_refcell_ref)]
1894 let mut __dfir_inline_closure = async move || {
1895 #( #subgraph_blocks )*
1896
1897 #df.__end_tick();
1898 };
1899 __dfir_inline_closure
1900 }
1901 })
1902 }
1903
1904 pub fn node_color_map(&self) -> SparseSecondaryMap<GraphNodeId, Color> {
1907 let mut node_color_map: SparseSecondaryMap<GraphNodeId, Color> = self
1908 .node_ids()
1909 .filter_map(|node_id| {
1910 let op_color = self.node_color(node_id)?;
1911 Some((node_id, op_color))
1912 })
1913 .collect();
1914
1915 for sg_nodes in self.subgraph_nodes.values() {
1917 let pull_to_push_idx = self.find_pull_to_push_idx(sg_nodes);
1918
1919 for (idx, node_id) in sg_nodes.iter().copied().enumerate() {
1920 let is_pull = idx < pull_to_push_idx;
1921 node_color_map.insert(node_id, if is_pull { Color::Pull } else { Color::Push });
1922 }
1923 }
1924
1925 node_color_map
1926 }
1927
1928 pub fn to_mermaid(&self, write_config: &WriteConfig) -> String {
1930 let mut output = String::new();
1931 self.write_mermaid(&mut output, write_config).unwrap();
1932 output
1933 }
1934
1935 pub fn write_mermaid(
1937 &self,
1938 output: impl std::fmt::Write,
1939 write_config: &WriteConfig,
1940 ) -> std::fmt::Result {
1941 let mut graph_write = Mermaid::new(output);
1942 self.write_graph(&mut graph_write, write_config)
1943 }
1944
1945 pub fn to_dot(&self, write_config: &WriteConfig) -> String {
1947 let mut output = String::new();
1948 let mut graph_write = Dot::new(&mut output);
1949 self.write_graph(&mut graph_write, write_config).unwrap();
1950 output
1951 }
1952
1953 pub fn write_dot(
1955 &self,
1956 output: impl std::fmt::Write,
1957 write_config: &WriteConfig,
1958 ) -> std::fmt::Result {
1959 let mut graph_write = Dot::new(output);
1960 self.write_graph(&mut graph_write, write_config)
1961 }
1962
1963 pub(crate) fn write_graph<W>(
1965 &self,
1966 mut graph_write: W,
1967 write_config: &WriteConfig,
1968 ) -> Result<(), W::Err>
1969 where
1970 W: GraphWrite,
1971 {
1972 fn helper_edge_label(
1973 src_port: &PortIndexValue,
1974 dst_port: &PortIndexValue,
1975 ) -> Option<String> {
1976 let src_label = match src_port {
1977 PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1978 PortIndexValue::Int(index) => Some(index.value.to_string()),
1979 _ => None,
1980 };
1981 let dst_label = match dst_port {
1982 PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1983 PortIndexValue::Int(index) => Some(index.value.to_string()),
1984 _ => None,
1985 };
1986 let label = match (src_label, dst_label) {
1987 (Some(l1), Some(l2)) => Some(format!("{}\n{}", l1, l2)),
1988 (Some(l1), None) => Some(l1),
1989 (None, Some(l2)) => Some(l2),
1990 (None, None) => None,
1991 };
1992 label
1993 }
1994
1995 let node_color_map = self.node_color_map();
1997
1998 graph_write.write_prologue()?;
2000
2001 let mut skipped_handoffs = BTreeSet::new();
2003 let mut subgraph_handoffs = <BTreeMap<GraphSubgraphId, Vec<GraphNodeId>>>::new();
2004 for (node_id, node) in self.nodes() {
2005 if matches!(node, GraphNode::Handoff { .. }) {
2006 if write_config.no_handoffs {
2007 skipped_handoffs.insert(node_id);
2008 continue;
2009 } else {
2010 let pred_node = self.node_predecessor_nodes(node_id).next().unwrap();
2011 let pred_sg = self.node_subgraph(pred_node);
2012 let succ_node = self.node_successor_nodes(node_id).next().unwrap();
2013 let succ_sg = self.node_subgraph(succ_node);
2014 if let Some((pred_sg, succ_sg)) = pred_sg.zip(succ_sg)
2015 && pred_sg == succ_sg
2016 {
2017 subgraph_handoffs.entry(pred_sg).or_default().push(node_id);
2018 }
2019 }
2020 }
2021 graph_write.write_node_definition(
2022 node_id,
2023 &if write_config.op_short_text {
2024 node.to_name_string()
2025 } else if write_config.op_text_no_imports {
2026 let full_text = node.to_pretty_string();
2028 let mut output = String::new();
2029 for sentence in full_text.split('\n') {
2030 if sentence.trim().starts_with("use") {
2031 continue;
2032 }
2033 output.push('\n');
2034 output.push_str(sentence);
2035 }
2036 output.into()
2037 } else {
2038 node.to_pretty_string()
2039 },
2040 if write_config.no_pull_push {
2041 None
2042 } else {
2043 node_color_map.get(node_id).copied()
2044 },
2045 )?;
2046 }
2047
2048 for (edge_id, (src_id, mut dst_id)) in self.edges() {
2050 if skipped_handoffs.contains(&src_id) {
2052 continue;
2053 }
2054
2055 let (src_port, mut dst_port) = self.edge_ports(edge_id);
2056 if skipped_handoffs.contains(&dst_id) {
2057 let mut handoff_succs = self.node_successors(dst_id);
2058 assert_eq!(1, handoff_succs.len());
2059 let (succ_edge, succ_node) = handoff_succs.next().unwrap();
2060 dst_id = succ_node;
2061 dst_port = self.edge_ports(succ_edge).1;
2062 }
2063
2064 let label = helper_edge_label(src_port, dst_port);
2065 let delay_type = self
2066 .node_op_inst(dst_id)
2067 .and_then(|op_inst| (op_inst.op_constraints.input_delaytype_fn)(dst_port));
2068 graph_write.write_edge(src_id, dst_id, delay_type, label.as_deref(), false)?;
2069 }
2070
2071 if !write_config.no_references {
2073 for dst_id in self.node_ids() {
2074 for src_ref_id in self
2075 .node_singleton_references(dst_id)
2076 .iter()
2077 .copied()
2078 .flatten()
2079 {
2080 let delay_type = Some(DelayType::Stratum);
2081 let label = None;
2082 graph_write.write_edge(src_ref_id, dst_id, delay_type, label, true)?;
2083 }
2084 }
2085 }
2086
2087 let loop_subgraphs = self.subgraph_ids().map(|sg_id| {
2098 let loop_id = if write_config.no_loops {
2099 None
2100 } else {
2101 self.subgraph_loop(sg_id)
2102 };
2103 (loop_id, sg_id)
2104 });
2105 let loop_subgraphs = into_group_map(loop_subgraphs);
2106 for (loop_id, subgraph_ids) in loop_subgraphs {
2107 if let Some(loop_id) = loop_id {
2108 graph_write.write_loop_start(loop_id)?;
2109 }
2110
2111 let subgraph_varnames_nodes = subgraph_ids.into_iter().flat_map(|sg_id| {
2113 self.subgraph(sg_id).iter().copied().map(move |node_id| {
2114 let opt_sg_id = if write_config.no_subgraphs {
2115 None
2116 } else {
2117 Some(sg_id)
2118 };
2119 (opt_sg_id, (self.node_varname(node_id), node_id))
2120 })
2121 });
2122 let subgraph_varnames_nodes = into_group_map(subgraph_varnames_nodes);
2123 for (sg_id, varnames) in subgraph_varnames_nodes {
2124 if let Some(sg_id) = sg_id {
2125 let stratum = self.subgraph_stratum(sg_id).unwrap();
2126 graph_write.write_subgraph_start(sg_id, stratum)?;
2127 }
2128
2129 let varname_nodes = varnames.into_iter().map(|(varname, node)| {
2131 let varname = if write_config.no_varnames {
2132 None
2133 } else {
2134 varname
2135 };
2136 (varname, node)
2137 });
2138 let varname_nodes = into_group_map(varname_nodes);
2139 for (varname, node_ids) in varname_nodes {
2140 if let Some(varname) = varname {
2141 graph_write.write_varname_start(&varname.0.to_string(), sg_id)?;
2142 }
2143
2144 for node_id in node_ids {
2146 graph_write.write_node(node_id)?;
2147 }
2148
2149 if varname.is_some() {
2150 graph_write.write_varname_end()?;
2151 }
2152 }
2153
2154 if sg_id.is_some() {
2155 graph_write.write_subgraph_end()?;
2156 }
2157 }
2158
2159 if loop_id.is_some() {
2160 graph_write.write_loop_end()?;
2161 }
2162 }
2163
2164 graph_write.write_epilogue()?;
2166
2167 Ok(())
2168 }
2169
2170 pub fn surface_syntax_string(&self) -> String {
2172 let mut string = String::new();
2173 self.write_surface_syntax(&mut string).unwrap();
2174 string
2175 }
2176
2177 pub fn write_surface_syntax(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2179 for (key, node) in self.nodes.iter() {
2180 match node {
2181 GraphNode::Operator(op) => {
2182 writeln!(write, "{:?} = {};", key.data(), op.to_token_stream())?;
2183 }
2184 GraphNode::Handoff { .. } => {
2185 writeln!(write, "// {:?} = <handoff>;", key.data())?;
2186 }
2187 GraphNode::ModuleBoundary { .. } => panic!(),
2188 }
2189 }
2190 writeln!(write)?;
2191 for (_e, (src_key, dst_key)) in self.graph.edges() {
2192 writeln!(write, "{:?} -> {:?};", src_key.data(), dst_key.data())?;
2193 }
2194 Ok(())
2195 }
2196
2197 pub fn mermaid_string_flat(&self) -> String {
2199 let mut string = String::new();
2200 self.write_mermaid_flat(&mut string).unwrap();
2201 string
2202 }
2203
2204 pub fn write_mermaid_flat(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2206 writeln!(write, "flowchart TB")?;
2207 for (key, node) in self.nodes.iter() {
2208 match node {
2209 GraphNode::Operator(operator) => writeln!(
2210 write,
2211 " %% {span}\n {id:?}[\"{row_col} <tt>{code}</tt>\"]",
2212 span = PrettySpan(node.span()),
2213 id = key.data(),
2214 row_col = PrettyRowCol(node.span()),
2215 code = operator
2216 .to_token_stream()
2217 .to_string()
2218 .replace('&', "&")
2219 .replace('<', "<")
2220 .replace('>', ">")
2221 .replace('"', """)
2222 .replace('\n', "<br>"),
2223 ),
2224 GraphNode::Handoff { .. } => {
2225 writeln!(write, r#" {:?}{{"{}"}}"#, key.data(), HANDOFF_NODE_STR)
2226 }
2227 GraphNode::ModuleBoundary { .. } => {
2228 writeln!(
2229 write,
2230 r#" {:?}{{"{}"}}"#,
2231 key.data(),
2232 MODULE_BOUNDARY_NODE_STR
2233 )
2234 }
2235 }?;
2236 }
2237 writeln!(write)?;
2238 for (_e, (src_key, dst_key)) in self.graph.edges() {
2239 writeln!(write, " {:?}-->{:?}", src_key.data(), dst_key.data())?;
2240 }
2241 Ok(())
2242 }
2243}
2244
2245impl DfirGraph {
2247 pub fn loop_ids(&self) -> slotmap::basic::Keys<'_, GraphLoopId, Vec<GraphNodeId>> {
2249 self.loop_nodes.keys()
2250 }
2251
2252 pub fn loops(&self) -> slotmap::basic::Iter<'_, GraphLoopId, Vec<GraphNodeId>> {
2254 self.loop_nodes.iter()
2255 }
2256
2257 pub fn insert_loop(&mut self, parent_loop: Option<GraphLoopId>) -> GraphLoopId {
2259 let loop_id = self.loop_nodes.insert(Vec::new());
2260 self.loop_children.insert(loop_id, Vec::new());
2261 if let Some(parent_loop) = parent_loop {
2262 self.loop_parent.insert(loop_id, parent_loop);
2263 self.loop_children
2264 .get_mut(parent_loop)
2265 .unwrap()
2266 .push(loop_id);
2267 } else {
2268 self.root_loops.push(loop_id);
2269 }
2270 loop_id
2271 }
2272
2273 pub fn node_loop(&self, node_id: GraphNodeId) -> Option<GraphLoopId> {
2275 self.node_loops.get(node_id).copied()
2276 }
2277
2278 pub fn subgraph_loop(&self, subgraph_id: GraphSubgraphId) -> Option<GraphLoopId> {
2280 let &node_id = self.subgraph(subgraph_id).first().unwrap();
2281 let out = self.node_loop(node_id);
2282 debug_assert!(
2283 self.subgraph(subgraph_id)
2284 .iter()
2285 .all(|&node_id| self.node_loop(node_id) == out),
2286 "Subgraph nodes should all have the same loop context."
2287 );
2288 out
2289 }
2290
2291 pub fn loop_parent(&self, loop_id: GraphLoopId) -> Option<GraphLoopId> {
2293 self.loop_parent.get(loop_id).copied()
2294 }
2295
2296 pub fn loop_children(&self, loop_id: GraphLoopId) -> &Vec<GraphLoopId> {
2298 self.loop_children.get(loop_id).unwrap()
2299 }
2300}
2301
2302#[derive(Clone, Debug, Default)]
2304#[cfg_attr(feature = "clap-derive", derive(clap::Args))]
2305pub struct WriteConfig {
2306 #[cfg_attr(feature = "clap-derive", arg(long))]
2308 pub no_subgraphs: bool,
2309 #[cfg_attr(feature = "clap-derive", arg(long))]
2311 pub no_varnames: bool,
2312 #[cfg_attr(feature = "clap-derive", arg(long))]
2314 pub no_pull_push: bool,
2315 #[cfg_attr(feature = "clap-derive", arg(long))]
2317 pub no_handoffs: bool,
2318 #[cfg_attr(feature = "clap-derive", arg(long))]
2320 pub no_references: bool,
2321 #[cfg_attr(feature = "clap-derive", arg(long))]
2323 pub no_loops: bool,
2324
2325 #[cfg_attr(feature = "clap-derive", arg(long))]
2327 pub op_short_text: bool,
2328 #[cfg_attr(feature = "clap-derive", arg(long))]
2330 pub op_text_no_imports: bool,
2331}
2332
2333#[derive(Copy, Clone, Debug)]
2335#[cfg_attr(feature = "clap-derive", derive(clap::Parser, clap::ValueEnum))]
2336pub enum WriteGraphType {
2337 Mermaid,
2339 Dot,
2341}
2342
2343fn into_group_map<K, V>(iter: impl IntoIterator<Item = (K, V)>) -> BTreeMap<K, Vec<V>>
2345where
2346 K: Ord,
2347{
2348 let mut out: BTreeMap<_, Vec<_>> = BTreeMap::new();
2349 for (k, v) in iter {
2350 out.entry(k).or_default().push(v);
2351 }
2352 out
2353}