Skip to main content

dfir_lang/graph/
meta_graph.rs

1#![warn(missing_docs)]
2
3extern crate proc_macro;
4
5use std::collections::{BTreeMap, BTreeSet, VecDeque};
6use std::fmt::Debug;
7use std::iter::FusedIterator;
8
9use itertools::Itertools;
10use proc_macro2::{Ident, Literal, Span, TokenStream};
11use quote::{ToTokens, TokenStreamExt, format_ident, quote, quote_spanned};
12use serde::{Deserialize, Serialize};
13use slotmap::{Key, SecondaryMap, SlotMap, SparseSecondaryMap};
14use syn::spanned::Spanned;
15
16use super::graph_write::{Dot, GraphWrite, Mermaid};
17use super::ops::{
18    DelayType, OPERATORS, OperatorWriteOutput, WriteContextArgs, find_op_op_constraints,
19    null_write_iterator_fn,
20};
21use super::{
22    CONTEXT, Color, DiMulGraph, GRAPH, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId,
23    GraphSubgraphId, HANDOFF_NODE_STR, MODULE_BOUNDARY_NODE_STR, OperatorInstance, PortIndexValue,
24    Varname, change_spans, get_operator_generics,
25};
26use crate::diagnostic::{Diagnostic, Diagnostics, Level};
27use crate::pretty_span::{PrettyRowCol, PrettySpan};
28use crate::process_singletons;
29
30/// An abstract "meta graph" representation of a DFIR graph.
31///
32/// Can be with or without subgraph partitioning, stratification, and handoff insertion. This is
33/// the meta graph used for generating Rust source code in macros from DFIR sytnax.
34///
35/// This struct has a lot of methods for manipulating the graph, vaguely grouped together in
36/// separate `impl` blocks. You might notice a few particularly specific arbitray-seeming methods
37/// in here--those are just what was needed for the compilation algorithms. If you need another
38/// method then add it.
39#[derive(Default, Debug, Serialize, Deserialize)]
40pub struct DfirGraph {
41    /// Each node type (operator or handoff).
42    nodes: SlotMap<GraphNodeId, GraphNode>,
43
44    /// Instance data corresponding to each operator node.
45    /// This field will be empty after deserialization.
46    #[serde(skip)]
47    operator_instances: SecondaryMap<GraphNodeId, OperatorInstance>,
48    /// Debugging/tracing tag for each operator node.
49    operator_tag: SecondaryMap<GraphNodeId, String>,
50    /// Graph data structure (two-way adjacency list).
51    graph: DiMulGraph<GraphNodeId, GraphEdgeId>,
52    /// Input and output port for each edge.
53    ports: SecondaryMap<GraphEdgeId, (PortIndexValue, PortIndexValue)>,
54
55    /// Which loop a node belongs to (or none for top-level).
56    node_loops: SecondaryMap<GraphNodeId, GraphLoopId>,
57    /// Which nodes belong to each loop.
58    loop_nodes: SlotMap<GraphLoopId, Vec<GraphNodeId>>,
59    /// For the loop, what is its parent (`None` for top-level).
60    loop_parent: SparseSecondaryMap<GraphLoopId, GraphLoopId>,
61    /// What loops are at the root.
62    root_loops: Vec<GraphLoopId>,
63    /// For the loop, what are its child loops.
64    loop_children: SecondaryMap<GraphLoopId, Vec<GraphLoopId>>,
65
66    /// Which subgraph each node belongs to.
67    node_subgraph: SecondaryMap<GraphNodeId, GraphSubgraphId>,
68
69    /// Which nodes belong to each subgraph.
70    subgraph_nodes: SlotMap<GraphSubgraphId, Vec<GraphNodeId>>,
71    /// Which stratum each subgraph belongs to.
72    subgraph_stratum: SecondaryMap<GraphSubgraphId, usize>,
73
74    /// Resolved singletons varnames references, per node.
75    node_singleton_references: SparseSecondaryMap<GraphNodeId, Vec<Option<GraphNodeId>>>,
76    /// What variable name each graph node belongs to (if any). For debugging (graph writing) purposes only.
77    node_varnames: SparseSecondaryMap<GraphNodeId, Varname>,
78
79    /// If this subgraph is 'lazy' then when it sends data to a lower stratum it does not cause a new tick to start
80    /// This is to support lazy defers
81    /// If the value does not exist for a given subgraph id then the subgraph is not lazy.
82    subgraph_laziness: SecondaryMap<GraphSubgraphId, bool>,
83}
84
85/// Basic methods.
86impl DfirGraph {
87    /// Create a new empty graph.
88    pub fn new() -> Self {
89        Default::default()
90    }
91}
92
93/// Node methods.
94impl DfirGraph {
95    /// Get a node with its operator instance (if applicable).
96    pub fn node(&self, node_id: GraphNodeId) -> &GraphNode {
97        self.nodes.get(node_id).expect("Node not found.")
98    }
99
100    /// Get the `OperatorInstance` for a given node. Node must be an operator and have an
101    /// `OperatorInstance` present, otherwise will return `None`.
102    ///
103    /// Note that no operator instances will be persent after deserialization.
104    pub fn node_op_inst(&self, node_id: GraphNodeId) -> Option<&OperatorInstance> {
105        self.operator_instances.get(node_id)
106    }
107
108    /// Get the debug variable name attached to a graph node.
109    pub fn node_varname(&self, node_id: GraphNodeId) -> Option<&Varname> {
110        self.node_varnames.get(node_id)
111    }
112
113    /// Get subgraph for node.
114    pub fn node_subgraph(&self, node_id: GraphNodeId) -> Option<GraphSubgraphId> {
115        self.node_subgraph.get(node_id).copied()
116    }
117
118    /// Degree into a node, i.e. the number of predecessors.
119    pub fn node_degree_in(&self, node_id: GraphNodeId) -> usize {
120        self.graph.degree_in(node_id)
121    }
122
123    /// Degree out of a node, i.e. the number of successors.
124    pub fn node_degree_out(&self, node_id: GraphNodeId) -> usize {
125        self.graph.degree_out(node_id)
126    }
127
128    /// Successors, iterator of `(GraphEdgeId, GraphNodeId)` of outgoing edges.
129    pub fn node_successors(
130        &self,
131        src: GraphNodeId,
132    ) -> impl '_
133    + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
134    + ExactSizeIterator
135    + FusedIterator
136    + Clone
137    + Debug {
138        self.graph.successors(src)
139    }
140
141    /// Predecessors, iterator of `(GraphEdgeId, GraphNodeId)` of incoming edges.
142    pub fn node_predecessors(
143        &self,
144        dst: GraphNodeId,
145    ) -> impl '_
146    + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
147    + ExactSizeIterator
148    + FusedIterator
149    + Clone
150    + Debug {
151        self.graph.predecessors(dst)
152    }
153
154    /// Successor edges, iterator of `GraphEdgeId` of outgoing edges.
155    pub fn node_successor_edges(
156        &self,
157        src: GraphNodeId,
158    ) -> impl '_
159    + DoubleEndedIterator<Item = GraphEdgeId>
160    + ExactSizeIterator
161    + FusedIterator
162    + Clone
163    + Debug {
164        self.graph.successor_edges(src)
165    }
166
167    /// Predecessor edges, iterator of `GraphEdgeId` of incoming edges.
168    pub fn node_predecessor_edges(
169        &self,
170        dst: GraphNodeId,
171    ) -> impl '_
172    + DoubleEndedIterator<Item = GraphEdgeId>
173    + ExactSizeIterator
174    + FusedIterator
175    + Clone
176    + Debug {
177        self.graph.predecessor_edges(dst)
178    }
179
180    /// Successor nodes, iterator of `GraphNodeId`.
181    pub fn node_successor_nodes(
182        &self,
183        src: GraphNodeId,
184    ) -> impl '_
185    + DoubleEndedIterator<Item = GraphNodeId>
186    + ExactSizeIterator
187    + FusedIterator
188    + Clone
189    + Debug {
190        self.graph.successor_vertices(src)
191    }
192
193    /// Predecessor nodes, iterator of `GraphNodeId`.
194    pub fn node_predecessor_nodes(
195        &self,
196        dst: GraphNodeId,
197    ) -> impl '_
198    + DoubleEndedIterator<Item = GraphNodeId>
199    + ExactSizeIterator
200    + FusedIterator
201    + Clone
202    + Debug {
203        self.graph.predecessor_vertices(dst)
204    }
205
206    /// Iterator of node IDs `GraphNodeId`.
207    pub fn node_ids(&self) -> slotmap::basic::Keys<'_, GraphNodeId, GraphNode> {
208        self.nodes.keys()
209    }
210
211    /// Iterator over `(GraphNodeId, &Node)` pairs.
212    pub fn nodes(&self) -> slotmap::basic::Iter<'_, GraphNodeId, GraphNode> {
213        self.nodes.iter()
214    }
215
216    /// Insert a node, assigning the given varname.
217    pub fn insert_node(
218        &mut self,
219        node: GraphNode,
220        varname_opt: Option<Ident>,
221        loop_opt: Option<GraphLoopId>,
222    ) -> GraphNodeId {
223        let node_id = self.nodes.insert(node);
224        if let Some(varname) = varname_opt {
225            self.node_varnames.insert(node_id, Varname(varname));
226        }
227        if let Some(loop_id) = loop_opt {
228            self.node_loops.insert(node_id, loop_id);
229            self.loop_nodes[loop_id].push(node_id);
230        }
231        node_id
232    }
233
234    /// Insert an operator instance for the given node. Panics if already set.
235    pub fn insert_node_op_inst(&mut self, node_id: GraphNodeId, op_inst: OperatorInstance) {
236        assert!(matches!(
237            self.nodes.get(node_id),
238            Some(GraphNode::Operator(_))
239        ));
240        let old_inst = self.operator_instances.insert(node_id, op_inst);
241        assert!(old_inst.is_none());
242    }
243
244    /// Assign all operator instances if not set. Write diagnostic messages/errors into `diagnostics`.
245    pub fn insert_node_op_insts_all(&mut self, diagnostics: &mut Diagnostics) {
246        let mut op_insts = Vec::new();
247        for (node_id, node) in self.nodes() {
248            let GraphNode::Operator(operator) = node else {
249                continue;
250            };
251            if self.node_op_inst(node_id).is_some() {
252                continue;
253            };
254
255            // Op constraints.
256            let Some(op_constraints) = find_op_op_constraints(operator) else {
257                diagnostics.push(Diagnostic::spanned(
258                    operator.path.span(),
259                    Level::Error,
260                    format!("Unknown operator `{}`", operator.name_string()),
261                ));
262                continue;
263            };
264
265            // Input and output ports.
266            let (input_ports, output_ports) = {
267                let mut input_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
268                    .node_predecessors(node_id)
269                    .map(|(edge_id, pred_id)| (self.edge_ports(edge_id).1, pred_id))
270                    .collect();
271                // Ensure sorted by port index.
272                input_edges.sort();
273                let input_ports: Vec<PortIndexValue> = input_edges
274                    .into_iter()
275                    .map(|(port, _pred)| port)
276                    .cloned()
277                    .collect();
278
279                // Collect output arguments (successors).
280                let mut output_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
281                    .node_successors(node_id)
282                    .map(|(edge_id, succ)| (self.edge_ports(edge_id).0, succ))
283                    .collect();
284                // Ensure sorted by port index.
285                output_edges.sort();
286                let output_ports: Vec<PortIndexValue> = output_edges
287                    .into_iter()
288                    .map(|(port, _succ)| port)
289                    .cloned()
290                    .collect();
291
292                (input_ports, output_ports)
293            };
294
295            // Generic arguments.
296            let generics = get_operator_generics(diagnostics, operator);
297            // Generic argument errors.
298            {
299                // Span of `generic_args` (if it exists), otherwise span of the operator name.
300                let generics_span = generics
301                    .generic_args
302                    .as_ref()
303                    .map(Spanned::span)
304                    .unwrap_or_else(|| operator.path.span());
305
306                if !op_constraints
307                    .persistence_args
308                    .contains(&generics.persistence_args.len())
309                {
310                    diagnostics.push(Diagnostic::spanned(
311                        generics.persistence_args_span().unwrap_or(generics_span),
312                        Level::Error,
313                        format!(
314                            "`{}` should have {} persistence lifetime arguments, actually has {}.",
315                            op_constraints.name,
316                            op_constraints.persistence_args.human_string(),
317                            generics.persistence_args.len()
318                        ),
319                    ));
320                }
321                if !op_constraints.type_args.contains(&generics.type_args.len()) {
322                    diagnostics.push(Diagnostic::spanned(
323                        generics.type_args_span().unwrap_or(generics_span),
324                        Level::Error,
325                        format!(
326                            "`{}` should have {} generic type arguments, actually has {}.",
327                            op_constraints.name,
328                            op_constraints.type_args.human_string(),
329                            generics.type_args.len()
330                        ),
331                    ));
332                }
333            }
334
335            op_insts.push((
336                node_id,
337                OperatorInstance {
338                    op_constraints,
339                    input_ports,
340                    output_ports,
341                    singletons_referenced: operator.singletons_referenced.clone(),
342                    generics,
343                    arguments_pre: operator.args.clone(),
344                    arguments_raw: operator.args_raw.clone(),
345                },
346            ));
347        }
348
349        for (node_id, op_inst) in op_insts {
350            self.insert_node_op_inst(node_id, op_inst);
351        }
352    }
353
354    /// Inserts a node between two existing nodes connected by the given `edge_id`.
355    ///
356    /// `edge`: (src, dst, dst_idx)
357    ///
358    /// Before: A (src) ------------> B (dst)
359    /// After:  A (src) -> X (new) -> B (dst)
360    ///
361    /// Returns the ID of X & ID of edge OUT of X.
362    ///
363    /// Note that both the edges will be new and `edge_id` will be removed. Both new edges will
364    /// get the edge type of the original edge.
365    pub fn insert_intermediate_node(
366        &mut self,
367        edge_id: GraphEdgeId,
368        new_node: GraphNode,
369    ) -> (GraphNodeId, GraphEdgeId) {
370        let span = Some(new_node.span());
371
372        // Make corresponding operator instance (if `node` is an operator).
373        let op_inst_opt = 'oc: {
374            let GraphNode::Operator(operator) = &new_node else {
375                break 'oc None;
376            };
377            let Some(op_constraints) = find_op_op_constraints(operator) else {
378                break 'oc None;
379            };
380            let (input_port, output_port) = self.ports.get(edge_id).cloned().unwrap();
381
382            let mut dummy_diagnostics = Diagnostics::new();
383            let generics = get_operator_generics(&mut dummy_diagnostics, operator);
384            assert!(dummy_diagnostics.is_empty());
385
386            Some(OperatorInstance {
387                op_constraints,
388                input_ports: vec![input_port],
389                output_ports: vec![output_port],
390                singletons_referenced: operator.singletons_referenced.clone(),
391                generics,
392                arguments_pre: operator.args.clone(),
393                arguments_raw: operator.args_raw.clone(),
394            })
395        };
396
397        // Insert new `node`.
398        let node_id = self.nodes.insert(new_node);
399        // Insert corresponding `OperatorInstance` if applicable.
400        if let Some(op_inst) = op_inst_opt {
401            self.operator_instances.insert(node_id, op_inst);
402        }
403        // Update edges to insert node within `edge_id`.
404        let (e0, e1) = self
405            .graph
406            .insert_intermediate_vertex(node_id, edge_id)
407            .unwrap();
408
409        // Update corresponding ports.
410        let (src_idx, dst_idx) = self.ports.remove(edge_id).unwrap();
411        self.ports
412            .insert(e0, (src_idx, PortIndexValue::Elided(span)));
413        self.ports
414            .insert(e1, (PortIndexValue::Elided(span), dst_idx));
415
416        (node_id, e1)
417    }
418
419    /// Remove the node `node_id` but preserves and connects the single predecessor and single successor.
420    /// Panics if the node does not have exactly one predecessor and one successor, or is not in the graph.
421    pub fn remove_intermediate_node(&mut self, node_id: GraphNodeId) {
422        assert_eq!(
423            1,
424            self.node_degree_in(node_id),
425            "Removed intermediate node must have one predecessor"
426        );
427        assert_eq!(
428            1,
429            self.node_degree_out(node_id),
430            "Removed intermediate node must have one successor"
431        );
432        assert!(
433            self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
434            "Should not remove intermediate node after subgraph partitioning"
435        );
436
437        assert!(self.nodes.remove(node_id).is_some());
438        let (new_edge_id, (pred_edge_id, succ_edge_id)) =
439            self.graph.remove_intermediate_vertex(node_id).unwrap();
440        self.operator_instances.remove(node_id);
441        self.node_varnames.remove(node_id);
442
443        let (src_port, _) = self.ports.remove(pred_edge_id).unwrap();
444        let (_, dst_port) = self.ports.remove(succ_edge_id).unwrap();
445        self.ports.insert(new_edge_id, (src_port, dst_port));
446    }
447
448    /// Helper method: determine the "color" (pull vs push) of a node based on its in and out degree,
449    /// excluding reference edges. If linear (1 in, 1 out), color is `None`, indicating it can be
450    /// either push or pull.
451    ///
452    /// Note that this does NOT consider `DelayType` barriers (which generally implies `Pull`).
453    pub(crate) fn node_color(&self, node_id: GraphNodeId) -> Option<Color> {
454        if matches!(self.node(node_id), GraphNode::Handoff { .. }) {
455            return Some(Color::Hoff);
456        }
457
458        // TODO(shadaj): this is a horrible hack
459        if let GraphNode::Operator(op) = self.node(node_id)
460            && (op.name_string() == "resolve_futures_blocking"
461                || op.name_string() == "resolve_futures_blocking_ordered")
462        {
463            return Some(Color::Push);
464        }
465
466        // In-degree, excluding ref-edges.
467        let inn_degree = self.node_predecessor_nodes(node_id).count();
468        // Out-degree excluding ref-edges.
469        let out_degree = self.node_successor_nodes(node_id).count();
470
471        match (inn_degree, out_degree) {
472            (0, 0) => None, // Generally should not happen, "Degenerate subgraph detected".
473            (0, 1) => Some(Color::Pull),
474            (1, 0) => Some(Color::Push),
475            (1, 1) => None, // Linear, can be either push or pull.
476            (_many, 0 | 1) => Some(Color::Pull),
477            (0 | 1, _many) => Some(Color::Push),
478            (_many, _to_many) => Some(Color::Comp),
479        }
480    }
481
482    /// Set the operator tag (for debugging/tracing).
483    pub fn set_operator_tag(&mut self, node_id: GraphNodeId, tag: String) {
484        self.operator_tag.insert(node_id, tag);
485    }
486}
487
488/// Singleton references.
489impl DfirGraph {
490    /// Set the singletons referenced for the `node_id` operator. Each reference corresponds to the
491    /// same index in the [`crate::parse::Operator::singletons_referenced`] vec.
492    pub fn set_node_singleton_references(
493        &mut self,
494        node_id: GraphNodeId,
495        singletons_referenced: Vec<Option<GraphNodeId>>,
496    ) -> Option<Vec<Option<GraphNodeId>>> {
497        self.node_singleton_references
498            .insert(node_id, singletons_referenced)
499    }
500
501    /// Gets the singletons referenced by a node. Returns an empty iterator for non-operators and
502    /// operators that do not reference singletons.
503    pub fn node_singleton_references(&self, node_id: GraphNodeId) -> &[Option<GraphNodeId>] {
504        self.node_singleton_references
505            .get(node_id)
506            .map(std::ops::Deref::deref)
507            .unwrap_or_default()
508    }
509}
510
511/// Module methods.
512impl DfirGraph {
513    /// When modules are imported into a flat graph, they come with an input and output ModuleBoundary node.
514    /// The partitioner doesn't understand these nodes and will panic if it encounters them.
515    /// merge_modules removes them from the graph, stitching the input and ouput sides of the ModuleBondaries based on their ports
516    /// For example:
517    ///     source_iter([]) -> \[myport\]ModuleBoundary(input)\[my_port\] -> map(|x| x) -> ModuleBoundary(output) -> null();
518    /// in the above eaxmple, the \[myport\] port will be used to connect the source_iter with the map that is inside of the module.
519    /// The output module boundary has elided ports, this is also used to match up the input/output across the module boundary.
520    pub fn merge_modules(&mut self) -> Result<(), Diagnostic> {
521        let mod_bound_nodes = self
522            .nodes()
523            .filter(|(_nid, node)| matches!(node, GraphNode::ModuleBoundary { .. }))
524            .map(|(nid, _node)| nid)
525            .collect::<Vec<_>>();
526
527        for mod_bound_node in mod_bound_nodes {
528            self.remove_module_boundary(mod_bound_node)?;
529        }
530
531        Ok(())
532    }
533
534    /// see `merge_modules`
535    /// This function removes a singular module boundary from the graph and performs the necessary stitching to fix the graph afterward.
536    /// `merge_modules` calls this function for each module boundary in the graph.
537    fn remove_module_boundary(&mut self, mod_bound_node: GraphNodeId) -> Result<(), Diagnostic> {
538        assert!(
539            self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
540            "Should not remove intermediate node after subgraph partitioning"
541        );
542
543        let mut mod_pred_ports = BTreeMap::new();
544        let mut mod_succ_ports = BTreeMap::new();
545
546        for mod_out_edge in self.node_predecessor_edges(mod_bound_node) {
547            let (pred_port, succ_port) = self.edge_ports(mod_out_edge);
548            mod_pred_ports.insert(succ_port.clone(), (mod_out_edge, pred_port.clone()));
549        }
550
551        for mod_inn_edge in self.node_successor_edges(mod_bound_node) {
552            let (pred_port, succ_port) = self.edge_ports(mod_inn_edge);
553            mod_succ_ports.insert(pred_port.clone(), (mod_inn_edge, succ_port.clone()));
554        }
555
556        if mod_pred_ports.keys().collect::<BTreeSet<_>>()
557            != mod_succ_ports.keys().collect::<BTreeSet<_>>()
558        {
559            // get module boundary node
560            let GraphNode::ModuleBoundary { input, import_expr } = self.node(mod_bound_node) else {
561                panic!();
562            };
563
564            if *input {
565                return Err(Diagnostic {
566                    span: *import_expr,
567                    level: Level::Error,
568                    message: format!(
569                        "The ports into the module did not match. input: {:?}, expected: {:?}",
570                        mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
571                        mod_succ_ports.keys().map(|x| x.to_string()).join(", ")
572                    ),
573                });
574            } else {
575                return Err(Diagnostic {
576                    span: *import_expr,
577                    level: Level::Error,
578                    message: format!(
579                        "The ports out of the module did not match. output: {:?}, expected: {:?}",
580                        mod_succ_ports.keys().map(|x| x.to_string()).join(", "),
581                        mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
582                    ),
583                });
584            }
585        }
586
587        for (port, (pred_edge, pred_port)) in mod_pred_ports {
588            let (succ_edge, succ_port) = mod_succ_ports.remove(&port).unwrap();
589
590            let (src, _) = self.edge(pred_edge);
591            let (_, dst) = self.edge(succ_edge);
592            self.remove_edge(pred_edge);
593            self.remove_edge(succ_edge);
594
595            let new_edge_id = self.graph.insert_edge(src, dst);
596            self.ports.insert(new_edge_id, (pred_port, succ_port));
597        }
598
599        self.graph.remove_vertex(mod_bound_node);
600        self.nodes.remove(mod_bound_node);
601
602        Ok(())
603    }
604}
605
606/// Edge methods.
607impl DfirGraph {
608    /// Get the `src` and `dst` for an edge: `(src GraphNodeId, dst GraphNodeId)`.
609    pub fn edge(&self, edge_id: GraphEdgeId) -> (GraphNodeId, GraphNodeId) {
610        let (src, dst) = self.graph.edge(edge_id).expect("Edge not found.");
611        (src, dst)
612    }
613
614    /// Get the source and destination ports for an edge: `(src &PortIndexValue, dst &PortIndexValue)`.
615    pub fn edge_ports(&self, edge_id: GraphEdgeId) -> (&PortIndexValue, &PortIndexValue) {
616        let (src_port, dst_port) = self.ports.get(edge_id).expect("Edge not found.");
617        (src_port, dst_port)
618    }
619
620    /// Iterator of all edge IDs `GraphEdgeId`.
621    pub fn edge_ids(&self) -> slotmap::basic::Keys<'_, GraphEdgeId, (GraphNodeId, GraphNodeId)> {
622        self.graph.edge_ids()
623    }
624
625    /// Iterator over all edges: `(GraphEdgeId, (src GraphNodeId, dst GraphNodeId))`.
626    pub fn edges(
627        &self,
628    ) -> impl '_
629    + ExactSizeIterator<Item = (GraphEdgeId, (GraphNodeId, GraphNodeId))>
630    + FusedIterator
631    + Clone
632    + Debug {
633        self.graph.edges()
634    }
635
636    /// Insert an edge between nodes thru the given ports.
637    pub fn insert_edge(
638        &mut self,
639        src: GraphNodeId,
640        src_port: PortIndexValue,
641        dst: GraphNodeId,
642        dst_port: PortIndexValue,
643    ) -> GraphEdgeId {
644        let edge_id = self.graph.insert_edge(src, dst);
645        self.ports.insert(edge_id, (src_port, dst_port));
646        edge_id
647    }
648
649    /// Removes an edge and its corresponding ports and edge type info.
650    pub fn remove_edge(&mut self, edge: GraphEdgeId) {
651        let (_src, _dst) = self.graph.remove_edge(edge).unwrap();
652        let (_src_port, _dst_port) = self.ports.remove(edge).unwrap();
653    }
654}
655
656/// Subgraph methods.
657impl DfirGraph {
658    /// Nodes belonging to the given subgraph.
659    pub fn subgraph(&self, subgraph_id: GraphSubgraphId) -> &Vec<GraphNodeId> {
660        self.subgraph_nodes
661            .get(subgraph_id)
662            .expect("Subgraph not found.")
663    }
664
665    /// Iterator over all subgraph IDs.
666    pub fn subgraph_ids(&self) -> slotmap::basic::Keys<'_, GraphSubgraphId, Vec<GraphNodeId>> {
667        self.subgraph_nodes.keys()
668    }
669
670    /// Iterator over all subgraphs, ID and members: `(GraphSubgraphId, Vec<GraphNodeId>)`.
671    pub fn subgraphs(&self) -> slotmap::basic::Iter<'_, GraphSubgraphId, Vec<GraphNodeId>> {
672        self.subgraph_nodes.iter()
673    }
674
675    /// Create a subgraph consisting of `node_ids`. Returns an error if any of the nodes are already in a subgraph.
676    pub fn insert_subgraph(
677        &mut self,
678        node_ids: Vec<GraphNodeId>,
679    ) -> Result<GraphSubgraphId, (GraphNodeId, GraphSubgraphId)> {
680        // Check none are already in subgraphs
681        for &node_id in node_ids.iter() {
682            if let Some(&old_sg_id) = self.node_subgraph.get(node_id) {
683                return Err((node_id, old_sg_id));
684            }
685        }
686        let subgraph_id = self.subgraph_nodes.insert_with_key(|sg_id| {
687            for &node_id in node_ids.iter() {
688                self.node_subgraph.insert(node_id, sg_id);
689            }
690            node_ids
691        });
692
693        Ok(subgraph_id)
694    }
695
696    /// Removes a node from its subgraph. Returns true if the node was in a subgraph.
697    pub fn remove_from_subgraph(&mut self, node_id: GraphNodeId) -> bool {
698        if let Some(old_sg_id) = self.node_subgraph.remove(node_id) {
699            self.subgraph_nodes[old_sg_id].retain(|&other_node_id| other_node_id != node_id);
700            true
701        } else {
702            false
703        }
704    }
705
706    /// Gets the stratum number of the subgraph.
707    pub fn subgraph_stratum(&self, sg_id: GraphSubgraphId) -> Option<usize> {
708        self.subgraph_stratum.get(sg_id).copied()
709    }
710
711    /// Set subgraph's stratum number, returning the old value if exists.
712    pub fn set_subgraph_stratum(
713        &mut self,
714        sg_id: GraphSubgraphId,
715        stratum: usize,
716    ) -> Option<usize> {
717        self.subgraph_stratum.insert(sg_id, stratum)
718    }
719
720    /// Gets whether the subgraph is lazy or not
721    fn subgraph_laziness(&self, sg_id: GraphSubgraphId) -> bool {
722        self.subgraph_laziness.get(sg_id).copied().unwrap_or(false)
723    }
724
725    /// Set subgraph's laziness, returning the old value.
726    pub fn set_subgraph_laziness(&mut self, sg_id: GraphSubgraphId, lazy: bool) -> bool {
727        self.subgraph_laziness.insert(sg_id, lazy).unwrap_or(false)
728    }
729
730    /// Returns the the stratum number of the largest (latest) stratum (inclusive).
731    pub fn max_stratum(&self) -> Option<usize> {
732        self.subgraph_stratum.values().copied().max()
733    }
734
735    /// Helper: finds the first index in `subgraph_nodes` where it transitions from pull to push.
736    fn find_pull_to_push_idx(&self, subgraph_nodes: &[GraphNodeId]) -> usize {
737        subgraph_nodes
738            .iter()
739            .position(|&node_id| {
740                self.node_color(node_id)
741                    .is_some_and(|color| Color::Pull != color)
742            })
743            .unwrap_or(subgraph_nodes.len())
744    }
745}
746
747/// Display/output methods.
748impl DfirGraph {
749    /// Helper to generate a deterministic `Ident` for the given node.
750    fn node_as_ident(&self, node_id: GraphNodeId, is_pred: bool) -> Ident {
751        let name = match &self.nodes[node_id] {
752            GraphNode::Operator(_) => format!("op_{:?}", node_id.data()),
753            GraphNode::Handoff { .. } => format!(
754                "hoff_{:?}_{}",
755                node_id.data(),
756                if is_pred { "recv" } else { "send" }
757            ),
758            GraphNode::ModuleBoundary { .. } => panic!(),
759        };
760        let span = match (is_pred, &self.nodes[node_id]) {
761            (_, GraphNode::Operator(operator)) => operator.span(),
762            (true, &GraphNode::Handoff { src_span, .. }) => src_span,
763            (false, &GraphNode::Handoff { dst_span, .. }) => dst_span,
764            (_, GraphNode::ModuleBoundary { .. }) => panic!(),
765        };
766        Ident::new(&name, span)
767    }
768
769    /// For per-node singleton references. Helper to generate a deterministic `Ident` for the given node.
770    fn node_as_singleton_ident(&self, node_id: GraphNodeId, span: Span) -> Ident {
771        Ident::new(&format!("singleton_op_{:?}", node_id.data()), span)
772    }
773
774    /// Resolve the singletons via [`Self::node_singleton_references`] for the given `node_id`.
775    fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<Ident> {
776        self.node_singleton_references(node_id)
777            .iter()
778            .map(|singleton_node_id| {
779                // TODO(mingwei): this `expect` should be caught in error checking
780                self.node_as_singleton_ident(
781                    singleton_node_id
782                        .expect("Expected singleton to be resolved but was not, this is a bug."),
783                    span,
784                )
785            })
786            .collect::<Vec<_>>()
787    }
788
789    /// Returns each subgraph's receive and send handoffs.
790    /// `Map<GraphSubgraphId, (recv handoffs, send handoffs)>`
791    fn helper_collect_subgraph_handoffs(
792        &self,
793    ) -> SecondaryMap<GraphSubgraphId, (Vec<GraphNodeId>, Vec<GraphNodeId>)> {
794        // Get data on handoff src and dst subgraphs.
795        let mut subgraph_handoffs: SecondaryMap<
796            GraphSubgraphId,
797            (Vec<GraphNodeId>, Vec<GraphNodeId>),
798        > = self
799            .subgraph_nodes
800            .keys()
801            .map(|k| (k, Default::default()))
802            .collect();
803
804        // For each handoff node, add it to the `send`/`recv` lists for the corresponding subgraphs.
805        for (hoff_id, node) in self.nodes() {
806            if !matches!(node, GraphNode::Handoff { .. }) {
807                continue;
808            }
809            // Receivers from the handoff. (Should really only be one).
810            for (_edge, succ_id) in self.node_successors(hoff_id) {
811                let succ_sg = self.node_subgraph(succ_id).unwrap();
812                subgraph_handoffs[succ_sg].0.push(hoff_id);
813            }
814            // Senders into the handoff. (Should really only be one).
815            for (_edge, pred_id) in self.node_predecessors(hoff_id) {
816                let pred_sg = self.node_subgraph(pred_id).unwrap();
817                subgraph_handoffs[pred_sg].1.push(hoff_id);
818            }
819        }
820
821        subgraph_handoffs
822    }
823
824    /// Code for adding all nested loops.
825    fn codegen_nested_loops(&self, df: &Ident) -> TokenStream {
826        // Breadth-first iteration from outermost (root) loops to deepest nested loops.
827        let mut out = TokenStream::new();
828        let mut queue = VecDeque::from_iter(self.root_loops.iter().copied());
829        while let Some(loop_id) = queue.pop_front() {
830            let parent_opt = self
831                .loop_parent(loop_id)
832                .map(|loop_id| loop_id.as_ident(Span::call_site()))
833                .map(|ident| quote! { Some(#ident) })
834                .unwrap_or_else(|| quote! { None });
835            let loop_name = loop_id.as_ident(Span::call_site());
836            out.append_all(quote! {
837                let #loop_name = #df.add_loop(#parent_opt);
838            });
839            queue.extend(self.loop_children.get(loop_id).into_iter().flatten());
840        }
841        out
842    }
843
844    /// Emit this graph as a `Dfir` instance with handoffs, subgraph closures, and a
845    /// runtime scheduler that drives execution.
846    ///
847    /// See also [`Self::as_code_inline`] for the experimental inline codegen path.
848    ///
849    /// Returns all diagnostics as `Err(diagnostics)` if any are errors (leaving `&mut diagnostics` empty).
850    pub fn as_code(
851        &self,
852        root: &TokenStream,
853        include_type_guards: bool,
854        prefix: TokenStream,
855        diagnostics: &mut Diagnostics,
856    ) -> Result<TokenStream, Diagnostics> {
857        let df = Ident::new(GRAPH, Span::call_site());
858        let context = Ident::new(CONTEXT, Span::call_site());
859
860        // Code for adding handoffs.
861        let handoff_code = self
862            .nodes
863            .iter()
864            .filter_map(|(node_id, node)| match node {
865                GraphNode::Operator(_) => None,
866                &GraphNode::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))),
867                GraphNode::ModuleBoundary { .. } => panic!(),
868            })
869            .map(|(node_id, (src_span, dst_span))| {
870                let ident_send = Ident::new(&format!("hoff_{:?}_send", node_id.data()), dst_span);
871                let ident_recv = Ident::new(&format!("hoff_{:?}_recv", node_id.data()), src_span);
872                let span = src_span.join(dst_span).unwrap_or(src_span);
873                let mut hoff_name = Literal::string(&format!("handoff {:?}", node_id));
874                hoff_name.set_span(span);
875                let hoff_type = quote_spanned! (span=> #root::scheduled::handoff::VecHandoff<_>);
876                quote_spanned! {span=>
877                    let (#ident_send, #ident_recv) =
878                        #df.make_edge::<_, #hoff_type>(#hoff_name);
879                }
880            });
881
882        let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
883
884        // we first generate the subgraphs that have no inputs to guide type inference
885        let (subgraphs_without_preds, subgraphs_with_preds) = self
886            .subgraph_nodes
887            .iter()
888            .partition::<Vec<_>, _>(|(_, nodes)| {
889                nodes
890                    .iter()
891                    .any(|&node_id| self.node_degree_in(node_id) == 0)
892            });
893
894        let mut op_prologue_code = Vec::new();
895        let mut op_prologue_after_code = Vec::new();
896        let mut subgraphs = Vec::new();
897        {
898            for &(subgraph_id, subgraph_nodes) in subgraphs_without_preds
899                .iter()
900                .chain(subgraphs_with_preds.iter())
901            {
902                let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
903                let recv_ports: Vec<Ident> = recv_hoffs
904                    .iter()
905                    .map(|&hoff_id| self.node_as_ident(hoff_id, true))
906                    .collect();
907                let send_ports: Vec<Ident> = send_hoffs
908                    .iter()
909                    .map(|&hoff_id| self.node_as_ident(hoff_id, false))
910                    .collect();
911
912                let recv_port_code = recv_ports.iter().map(|ident| {
913                    quote_spanned! {ident.span()=>
914                        let mut #ident = #ident.borrow_mut_swap();
915                        let #ident = #root::dfir_pipes::pull::iter(#ident.drain(..));
916                    }
917                });
918                let send_port_code = send_ports.iter().map(|ident| {
919                    quote_spanned! {ident.span()=>
920                        let mut #ident = #ident.borrow_mut_give();
921                        let #ident = #root::dfir_pipes::push::vec_push(&mut *#ident);
922                    }
923                });
924
925                let loop_id = self
926                    // All nodes in a subgraph should be in the same loop.
927                    .node_loop(subgraph_nodes[0]);
928
929                let mut subgraph_op_iter_code = Vec::new();
930                let mut subgraph_op_iter_after_code = Vec::new();
931                {
932                    let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
933
934                    let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
935                    let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
936
937                    for (idx, &node_id) in nodes_iter.enumerate() {
938                        let node = &self.nodes[node_id];
939                        assert!(
940                            matches!(node, GraphNode::Operator(_)),
941                            "Handoffs are not part of subgraphs."
942                        );
943                        let op_inst = &self.operator_instances[node_id];
944
945                        let op_span = node.span();
946                        let op_name = op_inst.op_constraints.name;
947                        // Use op's span for root. #root is expected to be correct, any errors should span back to the op gen.
948                        let root = change_spans(root.clone(), op_span);
949                        // TODO(mingwei): Just use `op_inst.op_constraints`?
950                        let op_constraints = OPERATORS
951                            .iter()
952                            .find(|op| op_name == op.name)
953                            .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
954
955                        let ident = self.node_as_ident(node_id, false);
956
957                        {
958                            // TODO clean this up.
959                            // Collect input arguments (predecessors).
960                            let mut input_edges = self
961                                .graph
962                                .predecessor_edges(node_id)
963                                .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
964                                .collect::<Vec<_>>();
965                            // Ensure sorted by port index.
966                            input_edges.sort();
967
968                            let inputs = input_edges
969                                .iter()
970                                .map(|&(_port, edge_id)| {
971                                    let (pred, _) = self.edge(edge_id);
972                                    self.node_as_ident(pred, true)
973                                })
974                                .collect::<Vec<_>>();
975
976                            // Collect output arguments (successors).
977                            let mut output_edges = self
978                                .graph
979                                .successor_edges(node_id)
980                                .map(|edge_id| (&self.ports[edge_id].0, edge_id))
981                                .collect::<Vec<_>>();
982                            // Ensure sorted by port index.
983                            output_edges.sort();
984
985                            let outputs = output_edges
986                                .iter()
987                                .map(|&(_port, edge_id)| {
988                                    let (_, succ) = self.edge(edge_id);
989                                    self.node_as_ident(succ, false)
990                                })
991                                .collect::<Vec<_>>();
992
993                            let is_pull = idx < pull_to_push_idx;
994
995                            let singleton_output_ident = &if op_constraints.has_singleton_output {
996                                self.node_as_singleton_ident(node_id, op_span)
997                            } else {
998                                // This ident *should* go unused.
999                                Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
1000                            };
1001
1002                            // There's a bit of dark magic hidden in `Span`s... you'd think it's just a `file:line:column`,
1003                            // but it has one extra bit of info for _name resolution_, used for `Ident`s. `Span::call_site()`
1004                            // has the (unhygienic) resolution we want, an ident is just solely determined by its string name,
1005                            // which is what you'd expect out of unhygienic proc macros like this. Meanwhile, declarative macros
1006                            // use `Span::mixed_site()` which is weird and I don't understand it. It turns out that if you call
1007                            // the dfir syntax proc macro from _within_ a declarative macro then `op_span` will have the
1008                            // bad `Span::mixed_site()` name resolution and cause "Cannot find value `df/context`" errors. So
1009                            // we call `.resolved_at()` to fix resolution back to `Span::call_site()`. -Mingwei
1010                            let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1011                            let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1012
1013                            let singletons_resolved =
1014                                self.helper_resolve_singletons(node_id, op_span);
1015                            let arguments = &process_singletons::postprocess_singletons(
1016                                op_inst.arguments_raw.clone(),
1017                                singletons_resolved.clone(),
1018                                context,
1019                            );
1020                            let arguments_handles =
1021                                &process_singletons::postprocess_singletons_handles(
1022                                    op_inst.arguments_raw.clone(),
1023                                    singletons_resolved.clone(),
1024                                );
1025
1026                            let source_tag = 'a: {
1027                                if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1028                                    break 'a tag;
1029                                }
1030
1031                                #[cfg(nightly)]
1032                                if proc_macro::is_available() {
1033                                    let op_span = op_span.unwrap();
1034                                    break 'a format!(
1035                                        "loc_{}_{}_{}_{}_{}",
1036                                        crate::pretty_span::make_source_path_relative(
1037                                            &op_span.file()
1038                                        )
1039                                        .display()
1040                                        .to_string()
1041                                        .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1042                                        op_span.start().line(),
1043                                        op_span.start().column(),
1044                                        op_span.end().line(),
1045                                        op_span.end().column(),
1046                                    );
1047                                }
1048
1049                                format!(
1050                                    "loc_nopath_{}_{}_{}_{}",
1051                                    op_span.start().line,
1052                                    op_span.start().column,
1053                                    op_span.end().line,
1054                                    op_span.end().column
1055                                )
1056                            };
1057
1058                            let work_fn = format_ident!(
1059                                "{}__{}__{}",
1060                                ident,
1061                                op_name,
1062                                source_tag,
1063                                span = op_span
1064                            );
1065                            let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1066
1067                            let context_args = WriteContextArgs {
1068                                root: &root,
1069                                df_ident: df_local,
1070                                context,
1071                                subgraph_id,
1072                                node_id,
1073                                loop_id,
1074                                op_span,
1075                                op_tag: self.operator_tag.get(node_id).cloned(),
1076                                work_fn: &work_fn,
1077                                work_fn_async: &work_fn_async,
1078                                ident: &ident,
1079                                is_pull,
1080                                inputs: &inputs,
1081                                outputs: &outputs,
1082                                singleton_output_ident,
1083                                op_name,
1084                                op_inst,
1085                                arguments,
1086                                arguments_handles,
1087                            };
1088
1089                            let write_result =
1090                                (op_constraints.write_fn)(&context_args, diagnostics);
1091                            let OperatorWriteOutput {
1092                                write_prologue,
1093                                write_prologue_after,
1094                                write_iterator,
1095                                write_iterator_after,
1096                            } = write_result.unwrap_or_else(|()| {
1097                                assert!(
1098                                    diagnostics.has_error(),
1099                                    "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1100                                    op_name,
1101                                );
1102                                OperatorWriteOutput { write_iterator: null_write_iterator_fn(&context_args), ..Default::default() }
1103                            });
1104
1105                            op_prologue_code.push(syn::parse_quote! {
1106                                #[allow(non_snake_case)]
1107                                #[inline(always)]
1108                                fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1109                                    thunk()
1110                                }
1111
1112                                #[allow(non_snake_case)]
1113                                #[inline(always)]
1114                                async fn #work_fn_async<T>(thunk: impl ::std::future::Future<Output = T>) -> T {
1115                                    thunk.await
1116                                }
1117                            });
1118                            op_prologue_code.push(write_prologue);
1119                            op_prologue_after_code.push(write_prologue_after);
1120                            subgraph_op_iter_code.push(write_iterator);
1121
1122                            if include_type_guards {
1123                                let type_guard = if is_pull {
1124                                    quote_spanned! {op_span=>
1125                                        let #ident = {
1126                                            #[allow(non_snake_case)]
1127                                            #[inline(always)]
1128                                            pub fn #work_fn<Item, Input>(input: Input)
1129                                                -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1130                                            where
1131                                                Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1132                                            {
1133                                                #root::pin_project_lite::pin_project! {
1134                                                    #[repr(transparent)]
1135                                                    struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1136                                                        #[pin]
1137                                                        inner: Input
1138                                                    }
1139                                                }
1140
1141                                                impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1142                                                where
1143                                                    Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1144                                                {
1145                                                    type Ctx<'ctx> = Input::Ctx<'ctx>;
1146
1147                                                    type Item = Item;
1148                                                    type Meta = Input::Meta;
1149                                                    type CanPend = Input::CanPend;
1150                                                    type CanEnd = Input::CanEnd;
1151
1152                                                    #[inline(always)]
1153                                                    fn pull(
1154                                                        self: ::std::pin::Pin<&mut Self>,
1155                                                        ctx: &mut Self::Ctx<'_>,
1156                                                    ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1157                                                        #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1158                                                    }
1159
1160                                                    #[inline(always)]
1161                                                    fn size_hint(&self) -> (usize, Option<usize>) {
1162                                                        #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1163                                                    }
1164                                                }
1165
1166                                                Pull {
1167                                                    inner: input
1168                                                }
1169                                            }
1170                                            #work_fn::<_, _>( #ident )
1171                                        };
1172                                    }
1173                                } else {
1174                                    quote_spanned! {op_span=>
1175                                        let #ident = {
1176                                            #[allow(non_snake_case)]
1177                                            #[inline(always)]
1178                                            pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1179                                            where
1180                                                Psh: #root::dfir_pipes::push::Push<Item, ()>
1181                                            {
1182                                                #root::pin_project_lite::pin_project! {
1183                                                    #[repr(transparent)]
1184                                                    struct PushGuard<Psh> {
1185                                                        #[pin]
1186                                                        inner: Psh,
1187                                                    }
1188                                                }
1189
1190                                                impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1191                                                where
1192                                                    Psh: #root::dfir_pipes::push::Push<Item, ()>,
1193                                                {
1194                                                    type Ctx<'ctx> = Psh::Ctx<'ctx>;
1195
1196                                                    type CanPend = Psh::CanPend;
1197
1198                                                    #[inline(always)]
1199                                                    fn poll_ready(
1200                                                        self: ::std::pin::Pin<&mut Self>,
1201                                                        ctx: &mut Self::Ctx<'_>,
1202                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1203                                                        #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1204                                                    }
1205
1206                                                    #[inline(always)]
1207                                                    fn start_send(
1208                                                        self: ::std::pin::Pin<&mut Self>,
1209                                                        item: Item,
1210                                                        meta: (),
1211                                                    ) {
1212                                                        #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1213                                                    }
1214
1215                                                    #[inline(always)]
1216                                                    fn poll_flush(
1217                                                        self: ::std::pin::Pin<&mut Self>,
1218                                                        ctx: &mut Self::Ctx<'_>,
1219                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1220                                                        #root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
1221                                                    }
1222
1223                                                    #[inline(always)]
1224                                                    fn size_hint(
1225                                                        self: ::std::pin::Pin<&mut Self>,
1226                                                        hint: (usize, Option<usize>),
1227                                                    ) {
1228                                                        #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1229                                                    }
1230                                                }
1231
1232                                                PushGuard {
1233                                                    inner: psh
1234                                                }
1235                                            }
1236                                            #work_fn( #ident )
1237                                        };
1238                                    }
1239                                };
1240                                subgraph_op_iter_code.push(type_guard);
1241                            }
1242                            subgraph_op_iter_after_code.push(write_iterator_after);
1243                        }
1244                    }
1245
1246                    {
1247                        // Determine pull and push halves of the `Pivot`.
1248                        let pull_ident = if 0 < pull_to_push_idx {
1249                            self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1250                        } else {
1251                            // Entire subgraph is push (with a single recv/pull handoff input).
1252                            recv_ports[0].clone()
1253                        };
1254
1255                        #[rustfmt::skip]
1256                        let push_ident = if let Some(&node_id) =
1257                            subgraph_nodes.get(pull_to_push_idx)
1258                        {
1259                            self.node_as_ident(node_id, false)
1260                        } else if 1 == send_ports.len() {
1261                            // Entire subgraph is pull (with a single send/push handoff output).
1262                            send_ports[0].clone()
1263                        } else {
1264                            diagnostics.push(Diagnostic::spanned(
1265                                pull_ident.span(),
1266                                Level::Error,
1267                                "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1268                            ));
1269                            continue;
1270                        };
1271
1272                        // Pivot span is combination of pull and push spans (or if not possible, just take the push).
1273                        let pivot_span = pull_ident
1274                            .span()
1275                            .join(push_ident.span())
1276                            .unwrap_or_else(|| push_ident.span());
1277                        let pivot_fn_ident =
1278                            Ident::new(&format!("pivot_run_sg_{:?}", subgraph_id.0), pivot_span);
1279                        let root = change_spans(root.clone(), pivot_span);
1280                        subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1281                            #[inline(always)]
1282                            fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1283                                -> impl ::std::future::Future<Output = ()>
1284                            where
1285                                Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1286                                Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1287                            {
1288                                #root::dfir_pipes::pull::Pull::send_push(pull, push)
1289                            }
1290                            (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1291                        });
1292                    }
1293                };
1294
1295                let subgraph_name = Literal::string(&format!("Subgraph {:?}", subgraph_id));
1296                let stratum = Literal::usize_unsuffixed(
1297                    self.subgraph_stratum.get(subgraph_id).cloned().unwrap_or(0),
1298                );
1299                let laziness = self.subgraph_laziness(subgraph_id);
1300
1301                // Codegen: the loop that this subgraph is in `Some(<loop_id>)`, or `None` if not in a loop.
1302                let loop_id_opt = loop_id
1303                    .map(|loop_id| loop_id.as_ident(Span::call_site()))
1304                    .map(|ident| quote! { Some(#ident) })
1305                    .unwrap_or_else(|| quote! { None });
1306
1307                let sg_ident = subgraph_id.as_ident(Span::call_site());
1308
1309                subgraphs.push(quote! {
1310                    let #sg_ident = #df.add_subgraph_full(
1311                        #subgraph_name,
1312                        #stratum,
1313                        var_expr!( #( #recv_ports ),* ),
1314                        var_expr!( #( #send_ports ),* ),
1315                        #laziness,
1316                        #loop_id_opt,
1317                        async move |#context, var_args!( #( #recv_ports ),* ), var_args!( #( #send_ports ),* )| {
1318                            #( #recv_port_code )*
1319                            #( #send_port_code )*
1320                            #( #subgraph_op_iter_code )*
1321                            #( #subgraph_op_iter_after_code )*
1322                        },
1323                    );
1324                });
1325            }
1326        }
1327
1328        if diagnostics.has_error() {
1329            return Err(std::mem::take(diagnostics));
1330        }
1331        let _ = diagnostics; // Ensure no more diagnostics may be added after checking for errors.
1332
1333        let loop_code = self.codegen_nested_loops(&df);
1334
1335        // These two are quoted separately here because iterators are lazily evaluated, so this
1336        // forces them to do their work. This work includes populating some data, namely
1337        // `diagonstics`, which we need to determine if it compilation was actually successful.
1338        // -Mingwei
1339        let code = quote! {
1340            #( #handoff_code )*
1341            #loop_code
1342            #( #op_prologue_code )*
1343            #( #subgraphs )*
1344            #( #op_prologue_after_code )*
1345        };
1346
1347        let meta_graph_json = serde_json::to_string(&self).unwrap();
1348        let meta_graph_json = Literal::string(&meta_graph_json);
1349
1350        let serde_diagnostics: Vec<_> = diagnostics.iter().map(Diagnostic::to_serde).collect();
1351        let diagnostics_json = serde_json::to_string(&*serde_diagnostics).unwrap();
1352        let diagnostics_json = Literal::string(&diagnostics_json);
1353
1354        Ok(quote! {
1355            {
1356                #[allow(unused_qualifications, clippy::await_holding_refcell_ref)]
1357                {
1358                    #prefix
1359
1360                    use #root::{var_expr, var_args};
1361
1362                    let mut #df = #root::scheduled::graph::Dfir::new();
1363                    #df.__assign_meta_graph(#meta_graph_json);
1364                    #df.__assign_diagnostics(#diagnostics_json);
1365
1366                    #code
1367
1368                    #df
1369                }
1370            }
1371        })
1372    }
1373
1374    /// Emit this graph as runnable Rust source code tokens that execute inline,
1375    /// without the `Dfir` runtime scheduler.
1376    ///
1377    /// Unlike [`Self::as_code`], which builds a `Dfir` graph object with handoffs,
1378    /// subgraph closures, and a scheduler that drives execution, this method generates
1379    /// a flat async closure where subgraph blocks are inlined in stratum order using
1380    /// local `Vec<T>` buffers instead of handoffs. Each call to the closure runs one
1381    /// tick. State is managed by a lightweight `InlineContext` instead of the full
1382    /// `Dfir` runtime.
1383    ///
1384    /// This is an experimental codegen path for the S3+Ref3 inline DAG design.
1385    pub fn as_code_inline(
1386        &self,
1387        root: &TokenStream,
1388        include_type_guards: bool,
1389        prefix: TokenStream,
1390        diagnostics: &mut Diagnostics,
1391    ) -> Result<TokenStream, Diagnostics> {
1392        let df = Ident::new(GRAPH, Span::call_site());
1393        let context = Ident::new(CONTEXT, Span::call_site());
1394
1395        // 1. Generate local Vec buffers for each handoff node.
1396        let buffer_code: Vec<TokenStream> = self
1397            .nodes
1398            .iter()
1399            .filter_map(|(node_id, node)| match node {
1400                GraphNode::Operator(_) => None,
1401                &GraphNode::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))),
1402                GraphNode::ModuleBoundary { .. } => panic!(),
1403            })
1404            .map(|(node_id, (src_span, dst_span))| {
1405                let span = src_span.join(dst_span).unwrap_or(src_span);
1406                let buf_ident = Ident::new(&format!("hoff_{:?}_buf", node_id.data()), span);
1407                quote_spanned! {span=>
1408                    let mut #buf_ident: Vec<_> = Vec::new();
1409                }
1410            })
1411            .collect();
1412
1413        // 2. Collect subgraph handoffs (same as as_code).
1414        let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
1415
1416        // 3. Sort subgraphs by stratum, then sources (no-preds) first (for type inference).
1417        let mut all_subgraphs: Vec<_> = self.subgraph_nodes.iter().collect();
1418        all_subgraphs.sort_by_key(|&(sg_id, nodes)| {
1419            let stratum = self.subgraph_stratum.get(sg_id).copied().unwrap_or(0);
1420            let is_source = nodes
1421                .iter()
1422                .any(|&node_id| self.node_degree_in(node_id) == 0);
1423            (stratum, !is_source)
1424        });
1425
1426        let mut op_prologue_code = Vec::new();
1427        let mut op_prologue_after_code = Vec::new();
1428        let mut subgraph_blocks = Vec::new();
1429        {
1430            for &(subgraph_id, subgraph_nodes) in all_subgraphs.iter() {
1431                let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
1432
1433                // Generate buffer ident helpers for this subgraph's handoffs.
1434                let recv_port_idents: Vec<Ident> = recv_hoffs
1435                    .iter()
1436                    .map(|&hoff_id| self.node_as_ident(hoff_id, true))
1437                    .collect();
1438                let send_port_idents: Vec<Ident> = send_hoffs
1439                    .iter()
1440                    .map(|&hoff_id| self.node_as_ident(hoff_id, false))
1441                    .collect();
1442
1443                // Map handoff node IDs to buffer idents.
1444                let recv_buf_idents: Vec<Ident> = recv_hoffs
1445                    .iter()
1446                    .map(|&hoff_id| {
1447                        let span = self.nodes[hoff_id].span();
1448                        Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
1449                    })
1450                    .collect();
1451                let send_buf_idents: Vec<Ident> = send_hoffs
1452                    .iter()
1453                    .map(|&hoff_id| {
1454                        let span = self.nodes[hoff_id].span();
1455                        Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
1456                    })
1457                    .collect();
1458
1459                // Recv port code: drain from buffer into iterator.
1460                let recv_port_code: Vec<TokenStream> = recv_port_idents
1461                    .iter()
1462                    .zip(recv_buf_idents.iter())
1463                    .map(|(port_ident, buf_ident)| {
1464                        quote_spanned! {port_ident.span()=>
1465                            let #port_ident = #root::dfir_pipes::pull::iter(#buf_ident.drain(..));
1466                        }
1467                    })
1468                    .collect();
1469
1470                // Send port code: push into buffer.
1471                let send_port_code: Vec<TokenStream> = send_port_idents
1472                    .iter()
1473                    .zip(send_buf_idents.iter())
1474                    .map(|(port_ident, buf_ident)| {
1475                        quote_spanned! {port_ident.span()=>
1476                            let #port_ident = #root::dfir_pipes::push::vec_push(&mut #buf_ident);
1477                        }
1478                    })
1479                    .collect();
1480
1481                // All nodes in a subgraph should be in the same loop.
1482                let loop_id = self.node_loop(subgraph_nodes[0]);
1483
1484                let mut subgraph_op_iter_code = Vec::new();
1485                let mut subgraph_op_iter_after_code = Vec::new();
1486                {
1487                    let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
1488
1489                    let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
1490                    let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
1491
1492                    for (idx, &node_id) in nodes_iter.enumerate() {
1493                        let node = &self.nodes[node_id];
1494                        assert!(
1495                            matches!(node, GraphNode::Operator(_)),
1496                            "Handoffs are not part of subgraphs."
1497                        );
1498                        let op_inst = &self.operator_instances[node_id];
1499
1500                        let op_span = node.span();
1501                        let op_name = op_inst.op_constraints.name;
1502                        // Use op's span for root. #root is expected to be correct, any errors should span back to the op gen.
1503                        let root = change_spans(root.clone(), op_span);
1504                        let op_constraints = OPERATORS
1505                            .iter()
1506                            .find(|op| op_name == op.name)
1507                            .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
1508
1509                        let ident = self.node_as_ident(node_id, false);
1510
1511                        {
1512                            // TODO clean this up.
1513                            // Collect input arguments (predecessors).
1514                            let mut input_edges = self
1515                                .graph
1516                                .predecessor_edges(node_id)
1517                                .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
1518                                .collect::<Vec<_>>();
1519                            // Ensure sorted by port index.
1520                            input_edges.sort();
1521
1522                            let inputs = input_edges
1523                                .iter()
1524                                .map(|&(_port, edge_id)| {
1525                                    let (pred, _) = self.edge(edge_id);
1526                                    self.node_as_ident(pred, true)
1527                                })
1528                                .collect::<Vec<_>>();
1529
1530                            // Collect output arguments (successors).
1531                            let mut output_edges = self
1532                                .graph
1533                                .successor_edges(node_id)
1534                                .map(|edge_id| (&self.ports[edge_id].0, edge_id))
1535                                .collect::<Vec<_>>();
1536                            // Ensure sorted by port index.
1537                            output_edges.sort();
1538
1539                            let outputs = output_edges
1540                                .iter()
1541                                .map(|&(_port, edge_id)| {
1542                                    let (_, succ) = self.edge(edge_id);
1543                                    self.node_as_ident(succ, false)
1544                                })
1545                                .collect::<Vec<_>>();
1546
1547                            let is_pull = idx < pull_to_push_idx;
1548
1549                            let singleton_output_ident = &if op_constraints.has_singleton_output {
1550                                self.node_as_singleton_ident(node_id, op_span)
1551                            } else {
1552                                // This ident *should* go unused.
1553                                Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
1554                            };
1555
1556                            // There's a bit of dark magic hidden in `Span`s... you'd think it's just a `file:line:column`,
1557                            // but it has one extra bit of info for _name resolution_, used for `Ident`s. `Span::call_site()`
1558                            // has the (unhygienic) resolution we want, an ident is just solely determined by its string name,
1559                            // which is what you'd expect out of unhygienic proc macros like this. Meanwhile, declarative macros
1560                            // use `Span::mixed_site()` which is weird and I don't understand it. It turns out that if you call
1561                            // the dfir syntax proc macro from _within_ a declarative macro then `op_span` will have the
1562                            // bad `Span::mixed_site()` name resolution and cause "Cannot find value `df/context`" errors. So
1563                            // we call `.resolved_at()` to fix resolution back to `Span::call_site()`. -Mingwei
1564                            let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1565                            let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1566
1567                            let singletons_resolved =
1568                                self.helper_resolve_singletons(node_id, op_span);
1569                            let arguments = &process_singletons::postprocess_singletons(
1570                                op_inst.arguments_raw.clone(),
1571                                singletons_resolved.clone(),
1572                                context,
1573                            );
1574                            let arguments_handles =
1575                                &process_singletons::postprocess_singletons_handles(
1576                                    op_inst.arguments_raw.clone(),
1577                                    singletons_resolved.clone(),
1578                                );
1579
1580                            let source_tag = 'a: {
1581                                if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1582                                    break 'a tag;
1583                                }
1584
1585                                #[cfg(nightly)]
1586                                if proc_macro::is_available() {
1587                                    let op_span = op_span.unwrap();
1588                                    break 'a format!(
1589                                        "loc_{}_{}_{}_{}_{}",
1590                                        crate::pretty_span::make_source_path_relative(
1591                                            &op_span.file()
1592                                        )
1593                                        .display()
1594                                        .to_string()
1595                                        .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1596                                        op_span.start().line(),
1597                                        op_span.start().column(),
1598                                        op_span.end().line(),
1599                                        op_span.end().column(),
1600                                    );
1601                                }
1602
1603                                format!(
1604                                    "loc_nopath_{}_{}_{}_{}",
1605                                    op_span.start().line,
1606                                    op_span.start().column,
1607                                    op_span.end().line,
1608                                    op_span.end().column
1609                                )
1610                            };
1611
1612                            let work_fn = format_ident!(
1613                                "{}__{}__{}",
1614                                ident,
1615                                op_name,
1616                                source_tag,
1617                                span = op_span
1618                            );
1619                            let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1620
1621                            let context_args = WriteContextArgs {
1622                                root: &root,
1623                                df_ident: df_local,
1624                                context,
1625                                subgraph_id,
1626                                node_id,
1627                                loop_id,
1628                                op_span,
1629                                op_tag: self.operator_tag.get(node_id).cloned(),
1630                                work_fn: &work_fn,
1631                                work_fn_async: &work_fn_async,
1632                                ident: &ident,
1633                                is_pull,
1634                                inputs: &inputs,
1635                                outputs: &outputs,
1636                                singleton_output_ident,
1637                                op_name,
1638                                op_inst,
1639                                arguments,
1640                                arguments_handles,
1641                            };
1642
1643                            let write_result =
1644                                (op_constraints.write_fn)(&context_args, diagnostics);
1645                            let OperatorWriteOutput {
1646                                write_prologue,
1647                                write_prologue_after,
1648                                write_iterator,
1649                                write_iterator_after,
1650                            } = write_result.unwrap_or_else(|()| {
1651                                assert!(
1652                                    diagnostics.has_error(),
1653                                    "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1654                                    op_name,
1655                                );
1656                                OperatorWriteOutput {
1657                                    write_iterator: null_write_iterator_fn(&context_args),
1658                                    ..Default::default()
1659                                }
1660                            });
1661
1662                            op_prologue_code.push(syn::parse_quote! {
1663                                #[allow(non_snake_case)]
1664                                #[inline(always)]
1665                                fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1666                                    thunk()
1667                                }
1668
1669                                #[allow(non_snake_case)]
1670                                #[inline(always)]
1671                                async fn #work_fn_async<T>(
1672                                    thunk: impl ::std::future::Future<Output = T>,
1673                                ) -> T {
1674                                    thunk.await
1675                                }
1676                            });
1677                            op_prologue_code.push(write_prologue);
1678                            op_prologue_after_code.push(write_prologue_after);
1679                            subgraph_op_iter_code.push(write_iterator);
1680
1681                            if include_type_guards {
1682                                let type_guard = if is_pull {
1683                                    quote_spanned! {op_span=>
1684                                        let #ident = {
1685                                            #[allow(non_snake_case)]
1686                                            #[inline(always)]
1687                                            pub fn #work_fn<Item, Input>(input: Input)
1688                                                -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1689                                            where
1690                                                Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1691                                            {
1692                                                #root::pin_project_lite::pin_project! {
1693                                                    #[repr(transparent)]
1694                                                    struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1695                                                        #[pin]
1696                                                        inner: Input
1697                                                    }
1698                                                }
1699
1700                                                impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1701                                                where
1702                                                    Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1703                                                {
1704                                                    type Ctx<'ctx> = Input::Ctx<'ctx>;
1705
1706                                                    type Item = Item;
1707                                                    type Meta = Input::Meta;
1708                                                    type CanPend = Input::CanPend;
1709                                                    type CanEnd = Input::CanEnd;
1710
1711                                                    #[inline(always)]
1712                                                    fn pull(
1713                                                        self: ::std::pin::Pin<&mut Self>,
1714                                                        ctx: &mut Self::Ctx<'_>,
1715                                                    ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1716                                                        #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1717                                                    }
1718
1719                                                    #[inline(always)]
1720                                                    fn size_hint(&self) -> (usize, Option<usize>) {
1721                                                        #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1722                                                    }
1723                                                }
1724
1725                                                Pull {
1726                                                    inner: input
1727                                                }
1728                                            }
1729                                            #work_fn::<_, _>( #ident )
1730                                        };
1731                                    }
1732                                } else {
1733                                    quote_spanned! {op_span=>
1734                                        let #ident = {
1735                                            #[allow(non_snake_case)]
1736                                            #[inline(always)]
1737                                            pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1738                                            where
1739                                                Psh: #root::dfir_pipes::push::Push<Item, ()>
1740                                            {
1741                                                #root::pin_project_lite::pin_project! {
1742                                                    #[repr(transparent)]
1743                                                    struct PushGuard<Psh> {
1744                                                        #[pin]
1745                                                        inner: Psh,
1746                                                    }
1747                                                }
1748
1749                                                impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1750                                                where
1751                                                    Psh: #root::dfir_pipes::push::Push<Item, ()>,
1752                                                {
1753                                                    type Ctx<'ctx> = Psh::Ctx<'ctx>;
1754
1755                                                    type CanPend = Psh::CanPend;
1756
1757                                                    #[inline(always)]
1758                                                    fn poll_ready(
1759                                                        self: ::std::pin::Pin<&mut Self>,
1760                                                        ctx: &mut Self::Ctx<'_>,
1761                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1762                                                        #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1763                                                    }
1764
1765                                                    #[inline(always)]
1766                                                    fn start_send(
1767                                                        self: ::std::pin::Pin<&mut Self>,
1768                                                        item: Item,
1769                                                        meta: (),
1770                                                    ) {
1771                                                        #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1772                                                    }
1773
1774                                                    #[inline(always)]
1775                                                    fn poll_flush(
1776                                                        self: ::std::pin::Pin<&mut Self>,
1777                                                        ctx: &mut Self::Ctx<'_>,
1778                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1779                                                        #root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
1780                                                    }
1781
1782                                                    #[inline(always)]
1783                                                    fn size_hint(
1784                                                        self: ::std::pin::Pin<&mut Self>,
1785                                                        hint: (usize, Option<usize>),
1786                                                    ) {
1787                                                        #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1788                                                    }
1789                                                }
1790
1791                                                PushGuard {
1792                                                    inner: psh
1793                                                }
1794                                            }
1795                                            #work_fn( #ident )
1796                                        };
1797                                    }
1798                                };
1799                                subgraph_op_iter_code.push(type_guard);
1800                            }
1801                            subgraph_op_iter_after_code.push(write_iterator_after);
1802                        }
1803                    }
1804
1805                    {
1806                        // Determine pull and push halves of the `Pivot`.
1807                        let pull_ident = if 0 < pull_to_push_idx {
1808                            self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1809                        } else {
1810                            // Entire subgraph is push (with a single recv/pull handoff input).
1811                            recv_port_idents[0].clone()
1812                        };
1813
1814                        #[rustfmt::skip]
1815                        let push_ident = if let Some(&node_id) =
1816                            subgraph_nodes.get(pull_to_push_idx)
1817                        {
1818                            self.node_as_ident(node_id, false)
1819                        } else if 1 == send_port_idents.len() {
1820                            // Entire subgraph is pull (with a single send/push handoff output).
1821                            send_port_idents[0].clone()
1822                        } else {
1823                            diagnostics.push(Diagnostic::spanned(
1824                                pull_ident.span(),
1825                                Level::Error,
1826                                "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1827                            ));
1828                            continue;
1829                        };
1830
1831                        // Pivot span is combination of pull and push spans (or if not possible, just take the push).
1832                        let pivot_span = pull_ident
1833                            .span()
1834                            .join(push_ident.span())
1835                            .unwrap_or_else(|| push_ident.span());
1836                        let pivot_fn_ident =
1837                            Ident::new(&format!("pivot_run_sg_{:?}", subgraph_id.0), pivot_span);
1838                        let root = change_spans(root.clone(), pivot_span);
1839                        subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1840                            #[inline(always)]
1841                            fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1842                                -> impl ::std::future::Future<Output = ()>
1843                            where
1844                                Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1845                                Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1846                            {
1847                                #root::dfir_pipes::pull::Pull::send_push(pull, push)
1848                            }
1849                            (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1850                        });
1851                    }
1852                };
1853
1854                // Generate a dummy subgraph ID for state lifespan hooks.
1855                let sg_ident = subgraph_id.as_ident(Span::call_site());
1856
1857                // Each subgraph block — .await works because the whole output is async.
1858                subgraph_blocks.push(quote! {
1859                    let #sg_ident: #root::scheduled::SubgraphId = #root::util::slot_vec::Key::from_raw(0);
1860                    {
1861                        let #context = &#df;;
1862                        #( #recv_port_code )*
1863                        #( #send_port_code )*
1864                        #( #subgraph_op_iter_code )*
1865                        #( #subgraph_op_iter_after_code )*
1866                    }
1867                });
1868
1869                // Collect per-subgraph prologues into the main prologue lists.
1870                // (They are already pushed above in the operator loop.)
1871            }
1872        }
1873
1874        if diagnostics.has_error() {
1875            return Err(std::mem::take(diagnostics));
1876        }
1877        let _ = diagnostics; // Ensure no more diagnostics may be added after checking for errors.
1878        // Prologues and buffer declarations persist across ticks (outside the closure).
1879        // Subgraph blocks run each tick (inside the closure).
1880        Ok(quote! {
1881            {
1882                #prefix
1883
1884                use #root::{var_expr, var_args};
1885
1886                #[allow(unused_mut)]
1887                let mut #df = #root::scheduled::context::InlineContext::new();
1888
1889                #( #buffer_code )*
1890                #( #op_prologue_code )*
1891                #( #op_prologue_after_code )*
1892
1893                #[allow(unused_qualifications, unused_mut, unused_variables, clippy::await_holding_refcell_ref)]
1894                let mut __dfir_inline_closure = async move || {
1895                    #( #subgraph_blocks )*
1896
1897                    #df.__end_tick();
1898                };
1899                __dfir_inline_closure
1900            }
1901        })
1902    }
1903
1904    /// Color mode (pull vs. push, handoff vs. comp) for nodes. Some nodes can be push *OR* pull;
1905    /// those nodes will not be set in the returned map.
1906    pub fn node_color_map(&self) -> SparseSecondaryMap<GraphNodeId, Color> {
1907        let mut node_color_map: SparseSecondaryMap<GraphNodeId, Color> = self
1908            .node_ids()
1909            .filter_map(|node_id| {
1910                let op_color = self.node_color(node_id)?;
1911                Some((node_id, op_color))
1912            })
1913            .collect();
1914
1915        // Fill in rest via subgraphs.
1916        for sg_nodes in self.subgraph_nodes.values() {
1917            let pull_to_push_idx = self.find_pull_to_push_idx(sg_nodes);
1918
1919            for (idx, node_id) in sg_nodes.iter().copied().enumerate() {
1920                let is_pull = idx < pull_to_push_idx;
1921                node_color_map.insert(node_id, if is_pull { Color::Pull } else { Color::Push });
1922            }
1923        }
1924
1925        node_color_map
1926    }
1927
1928    /// Writes this graph as mermaid into a string.
1929    pub fn to_mermaid(&self, write_config: &WriteConfig) -> String {
1930        let mut output = String::new();
1931        self.write_mermaid(&mut output, write_config).unwrap();
1932        output
1933    }
1934
1935    /// Writes this graph as mermaid into the given `Write`.
1936    pub fn write_mermaid(
1937        &self,
1938        output: impl std::fmt::Write,
1939        write_config: &WriteConfig,
1940    ) -> std::fmt::Result {
1941        let mut graph_write = Mermaid::new(output);
1942        self.write_graph(&mut graph_write, write_config)
1943    }
1944
1945    /// Writes this graph as DOT (graphviz) into a string.
1946    pub fn to_dot(&self, write_config: &WriteConfig) -> String {
1947        let mut output = String::new();
1948        let mut graph_write = Dot::new(&mut output);
1949        self.write_graph(&mut graph_write, write_config).unwrap();
1950        output
1951    }
1952
1953    /// Writes this graph as DOT (graphviz) into the given `Write`.
1954    pub fn write_dot(
1955        &self,
1956        output: impl std::fmt::Write,
1957        write_config: &WriteConfig,
1958    ) -> std::fmt::Result {
1959        let mut graph_write = Dot::new(output);
1960        self.write_graph(&mut graph_write, write_config)
1961    }
1962
1963    /// Write out this graph using the given `GraphWrite`. E.g. `Mermaid` or `Dot.
1964    pub(crate) fn write_graph<W>(
1965        &self,
1966        mut graph_write: W,
1967        write_config: &WriteConfig,
1968    ) -> Result<(), W::Err>
1969    where
1970        W: GraphWrite,
1971    {
1972        fn helper_edge_label(
1973            src_port: &PortIndexValue,
1974            dst_port: &PortIndexValue,
1975        ) -> Option<String> {
1976            let src_label = match src_port {
1977                PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1978                PortIndexValue::Int(index) => Some(index.value.to_string()),
1979                _ => None,
1980            };
1981            let dst_label = match dst_port {
1982                PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1983                PortIndexValue::Int(index) => Some(index.value.to_string()),
1984                _ => None,
1985            };
1986            let label = match (src_label, dst_label) {
1987                (Some(l1), Some(l2)) => Some(format!("{}\n{}", l1, l2)),
1988                (Some(l1), None) => Some(l1),
1989                (None, Some(l2)) => Some(l2),
1990                (None, None) => None,
1991            };
1992            label
1993        }
1994
1995        // Make node color map one time.
1996        let node_color_map = self.node_color_map();
1997
1998        // Write prologue.
1999        graph_write.write_prologue()?;
2000
2001        // Define nodes.
2002        let mut skipped_handoffs = BTreeSet::new();
2003        let mut subgraph_handoffs = <BTreeMap<GraphSubgraphId, Vec<GraphNodeId>>>::new();
2004        for (node_id, node) in self.nodes() {
2005            if matches!(node, GraphNode::Handoff { .. }) {
2006                if write_config.no_handoffs {
2007                    skipped_handoffs.insert(node_id);
2008                    continue;
2009                } else {
2010                    let pred_node = self.node_predecessor_nodes(node_id).next().unwrap();
2011                    let pred_sg = self.node_subgraph(pred_node);
2012                    let succ_node = self.node_successor_nodes(node_id).next().unwrap();
2013                    let succ_sg = self.node_subgraph(succ_node);
2014                    if let Some((pred_sg, succ_sg)) = pred_sg.zip(succ_sg)
2015                        && pred_sg == succ_sg
2016                    {
2017                        subgraph_handoffs.entry(pred_sg).or_default().push(node_id);
2018                    }
2019                }
2020            }
2021            graph_write.write_node_definition(
2022                node_id,
2023                &if write_config.op_short_text {
2024                    node.to_name_string()
2025                } else if write_config.op_text_no_imports {
2026                    // Remove any lines that start with "use" (imports)
2027                    let full_text = node.to_pretty_string();
2028                    let mut output = String::new();
2029                    for sentence in full_text.split('\n') {
2030                        if sentence.trim().starts_with("use") {
2031                            continue;
2032                        }
2033                        output.push('\n');
2034                        output.push_str(sentence);
2035                    }
2036                    output.into()
2037                } else {
2038                    node.to_pretty_string()
2039                },
2040                if write_config.no_pull_push {
2041                    None
2042                } else {
2043                    node_color_map.get(node_id).copied()
2044                },
2045            )?;
2046        }
2047
2048        // Write edges.
2049        for (edge_id, (src_id, mut dst_id)) in self.edges() {
2050            // Handling for if `write_config.no_handoffs` true.
2051            if skipped_handoffs.contains(&src_id) {
2052                continue;
2053            }
2054
2055            let (src_port, mut dst_port) = self.edge_ports(edge_id);
2056            if skipped_handoffs.contains(&dst_id) {
2057                let mut handoff_succs = self.node_successors(dst_id);
2058                assert_eq!(1, handoff_succs.len());
2059                let (succ_edge, succ_node) = handoff_succs.next().unwrap();
2060                dst_id = succ_node;
2061                dst_port = self.edge_ports(succ_edge).1;
2062            }
2063
2064            let label = helper_edge_label(src_port, dst_port);
2065            let delay_type = self
2066                .node_op_inst(dst_id)
2067                .and_then(|op_inst| (op_inst.op_constraints.input_delaytype_fn)(dst_port));
2068            graph_write.write_edge(src_id, dst_id, delay_type, label.as_deref(), false)?;
2069        }
2070
2071        // Write reference edges.
2072        if !write_config.no_references {
2073            for dst_id in self.node_ids() {
2074                for src_ref_id in self
2075                    .node_singleton_references(dst_id)
2076                    .iter()
2077                    .copied()
2078                    .flatten()
2079                {
2080                    let delay_type = Some(DelayType::Stratum);
2081                    let label = None;
2082                    graph_write.write_edge(src_ref_id, dst_id, delay_type, label, true)?;
2083                }
2084            }
2085        }
2086
2087        // The following code is a little bit tricky. Generally, the graph has the hierarchy:
2088        // `loop -> subgraph -> varname -> node`. However, each of these can be disabled via the `write_config`. To
2089        // handle both the enabled and disabled case, this code is structured as a series of nested loops. If the layer
2090        // is disabled, then the HashMap<Option<KEY>, Vec<VALUE>> will only have a single key (`None`) with a
2091        // corresponding `Vec` value containing everything. This way no special handling is needed for the next layer.
2092        //
2093        // (Note: `stratum` could also be included in this hierarchy, but it is being phased-out/deprecated in favor of
2094        // Flo loops).
2095
2096        // Loop -> Subgraphs
2097        let loop_subgraphs = self.subgraph_ids().map(|sg_id| {
2098            let loop_id = if write_config.no_loops {
2099                None
2100            } else {
2101                self.subgraph_loop(sg_id)
2102            };
2103            (loop_id, sg_id)
2104        });
2105        let loop_subgraphs = into_group_map(loop_subgraphs);
2106        for (loop_id, subgraph_ids) in loop_subgraphs {
2107            if let Some(loop_id) = loop_id {
2108                graph_write.write_loop_start(loop_id)?;
2109            }
2110
2111            // Subgraph -> Varnames.
2112            let subgraph_varnames_nodes = subgraph_ids.into_iter().flat_map(|sg_id| {
2113                self.subgraph(sg_id).iter().copied().map(move |node_id| {
2114                    let opt_sg_id = if write_config.no_subgraphs {
2115                        None
2116                    } else {
2117                        Some(sg_id)
2118                    };
2119                    (opt_sg_id, (self.node_varname(node_id), node_id))
2120                })
2121            });
2122            let subgraph_varnames_nodes = into_group_map(subgraph_varnames_nodes);
2123            for (sg_id, varnames) in subgraph_varnames_nodes {
2124                if let Some(sg_id) = sg_id {
2125                    let stratum = self.subgraph_stratum(sg_id).unwrap();
2126                    graph_write.write_subgraph_start(sg_id, stratum)?;
2127                }
2128
2129                // Varnames -> Nodes.
2130                let varname_nodes = varnames.into_iter().map(|(varname, node)| {
2131                    let varname = if write_config.no_varnames {
2132                        None
2133                    } else {
2134                        varname
2135                    };
2136                    (varname, node)
2137                });
2138                let varname_nodes = into_group_map(varname_nodes);
2139                for (varname, node_ids) in varname_nodes {
2140                    if let Some(varname) = varname {
2141                        graph_write.write_varname_start(&varname.0.to_string(), sg_id)?;
2142                    }
2143
2144                    // Write all nodes.
2145                    for node_id in node_ids {
2146                        graph_write.write_node(node_id)?;
2147                    }
2148
2149                    if varname.is_some() {
2150                        graph_write.write_varname_end()?;
2151                    }
2152                }
2153
2154                if sg_id.is_some() {
2155                    graph_write.write_subgraph_end()?;
2156                }
2157            }
2158
2159            if loop_id.is_some() {
2160                graph_write.write_loop_end()?;
2161            }
2162        }
2163
2164        // Write epilogue.
2165        graph_write.write_epilogue()?;
2166
2167        Ok(())
2168    }
2169
2170    /// Convert back into surface syntax.
2171    pub fn surface_syntax_string(&self) -> String {
2172        let mut string = String::new();
2173        self.write_surface_syntax(&mut string).unwrap();
2174        string
2175    }
2176
2177    /// Convert back into surface syntax.
2178    pub fn write_surface_syntax(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2179        for (key, node) in self.nodes.iter() {
2180            match node {
2181                GraphNode::Operator(op) => {
2182                    writeln!(write, "{:?} = {};", key.data(), op.to_token_stream())?;
2183                }
2184                GraphNode::Handoff { .. } => {
2185                    writeln!(write, "// {:?} = <handoff>;", key.data())?;
2186                }
2187                GraphNode::ModuleBoundary { .. } => panic!(),
2188            }
2189        }
2190        writeln!(write)?;
2191        for (_e, (src_key, dst_key)) in self.graph.edges() {
2192            writeln!(write, "{:?} -> {:?};", src_key.data(), dst_key.data())?;
2193        }
2194        Ok(())
2195    }
2196
2197    /// Convert into a [mermaid](https://mermaid-js.github.io/) graph. Ignores subgraphs.
2198    pub fn mermaid_string_flat(&self) -> String {
2199        let mut string = String::new();
2200        self.write_mermaid_flat(&mut string).unwrap();
2201        string
2202    }
2203
2204    /// Convert into a [mermaid](https://mermaid-js.github.io/) graph. Ignores subgraphs.
2205    pub fn write_mermaid_flat(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2206        writeln!(write, "flowchart TB")?;
2207        for (key, node) in self.nodes.iter() {
2208            match node {
2209                GraphNode::Operator(operator) => writeln!(
2210                    write,
2211                    "    %% {span}\n    {id:?}[\"{row_col} <tt>{code}</tt>\"]",
2212                    span = PrettySpan(node.span()),
2213                    id = key.data(),
2214                    row_col = PrettyRowCol(node.span()),
2215                    code = operator
2216                        .to_token_stream()
2217                        .to_string()
2218                        .replace('&', "&amp;")
2219                        .replace('<', "&lt;")
2220                        .replace('>', "&gt;")
2221                        .replace('"', "&quot;")
2222                        .replace('\n', "<br>"),
2223                ),
2224                GraphNode::Handoff { .. } => {
2225                    writeln!(write, r#"    {:?}{{"{}"}}"#, key.data(), HANDOFF_NODE_STR)
2226                }
2227                GraphNode::ModuleBoundary { .. } => {
2228                    writeln!(
2229                        write,
2230                        r#"    {:?}{{"{}"}}"#,
2231                        key.data(),
2232                        MODULE_BOUNDARY_NODE_STR
2233                    )
2234                }
2235            }?;
2236        }
2237        writeln!(write)?;
2238        for (_e, (src_key, dst_key)) in self.graph.edges() {
2239            writeln!(write, "    {:?}-->{:?}", src_key.data(), dst_key.data())?;
2240        }
2241        Ok(())
2242    }
2243}
2244
2245/// Loops
2246impl DfirGraph {
2247    /// Iterator over all loop IDs.
2248    pub fn loop_ids(&self) -> slotmap::basic::Keys<'_, GraphLoopId, Vec<GraphNodeId>> {
2249        self.loop_nodes.keys()
2250    }
2251
2252    /// Iterator over all loops, ID and members: `(GraphLoopId, Vec<GraphNodeId>)`.
2253    pub fn loops(&self) -> slotmap::basic::Iter<'_, GraphLoopId, Vec<GraphNodeId>> {
2254        self.loop_nodes.iter()
2255    }
2256
2257    /// Create a new loop context, with the given parent loop (or `None`).
2258    pub fn insert_loop(&mut self, parent_loop: Option<GraphLoopId>) -> GraphLoopId {
2259        let loop_id = self.loop_nodes.insert(Vec::new());
2260        self.loop_children.insert(loop_id, Vec::new());
2261        if let Some(parent_loop) = parent_loop {
2262            self.loop_parent.insert(loop_id, parent_loop);
2263            self.loop_children
2264                .get_mut(parent_loop)
2265                .unwrap()
2266                .push(loop_id);
2267        } else {
2268            self.root_loops.push(loop_id);
2269        }
2270        loop_id
2271    }
2272
2273    /// Get a node's loop context (or `None` for root).
2274    pub fn node_loop(&self, node_id: GraphNodeId) -> Option<GraphLoopId> {
2275        self.node_loops.get(node_id).copied()
2276    }
2277
2278    /// Get a subgraph's loop context (or `None` for root).
2279    pub fn subgraph_loop(&self, subgraph_id: GraphSubgraphId) -> Option<GraphLoopId> {
2280        let &node_id = self.subgraph(subgraph_id).first().unwrap();
2281        let out = self.node_loop(node_id);
2282        debug_assert!(
2283            self.subgraph(subgraph_id)
2284                .iter()
2285                .all(|&node_id| self.node_loop(node_id) == out),
2286            "Subgraph nodes should all have the same loop context."
2287        );
2288        out
2289    }
2290
2291    /// Get a loop context's parent loop context (or `None` for root).
2292    pub fn loop_parent(&self, loop_id: GraphLoopId) -> Option<GraphLoopId> {
2293        self.loop_parent.get(loop_id).copied()
2294    }
2295
2296    /// Get a loop context's child loops.
2297    pub fn loop_children(&self, loop_id: GraphLoopId) -> &Vec<GraphLoopId> {
2298        self.loop_children.get(loop_id).unwrap()
2299    }
2300}
2301
2302/// Configuration for writing graphs.
2303#[derive(Clone, Debug, Default)]
2304#[cfg_attr(feature = "clap-derive", derive(clap::Args))]
2305pub struct WriteConfig {
2306    /// Subgraphs will not be rendered if set.
2307    #[cfg_attr(feature = "clap-derive", arg(long))]
2308    pub no_subgraphs: bool,
2309    /// Variable names will not be rendered if set.
2310    #[cfg_attr(feature = "clap-derive", arg(long))]
2311    pub no_varnames: bool,
2312    /// Will not render pull/push shapes if set.
2313    #[cfg_attr(feature = "clap-derive", arg(long))]
2314    pub no_pull_push: bool,
2315    /// Will not render handoffs if set.
2316    #[cfg_attr(feature = "clap-derive", arg(long))]
2317    pub no_handoffs: bool,
2318    /// Will not render singleton references if set.
2319    #[cfg_attr(feature = "clap-derive", arg(long))]
2320    pub no_references: bool,
2321    /// Will not render loops if set.
2322    #[cfg_attr(feature = "clap-derive", arg(long))]
2323    pub no_loops: bool,
2324
2325    /// Op text will only be their name instead of the whole source.
2326    #[cfg_attr(feature = "clap-derive", arg(long))]
2327    pub op_short_text: bool,
2328    /// Op text will exclude any line that starts with "use".
2329    #[cfg_attr(feature = "clap-derive", arg(long))]
2330    pub op_text_no_imports: bool,
2331}
2332
2333/// Enum for choosing between mermaid and dot graph writing.
2334#[derive(Copy, Clone, Debug)]
2335#[cfg_attr(feature = "clap-derive", derive(clap::Parser, clap::ValueEnum))]
2336pub enum WriteGraphType {
2337    /// Mermaid graphs.
2338    Mermaid,
2339    /// Dot (Graphviz) graphs.
2340    Dot,
2341}
2342
2343/// [`itertools::Itertools::into_group_map`], but for `BTreeMap`.
2344fn into_group_map<K, V>(iter: impl IntoIterator<Item = (K, V)>) -> BTreeMap<K, Vec<V>>
2345where
2346    K: Ord,
2347{
2348    let mut out: BTreeMap<_, Vec<_>> = BTreeMap::new();
2349    for (k, v) in iter {
2350        out.entry(k).or_default().push(v);
2351    }
2352    out
2353}