dfir_lang/graph/
flat_graph_builder.rs

1//! Build a flat graph from [`HfStatement`]s.
2
3use std::borrow::Cow;
4use std::collections::btree_map::Entry;
5use std::collections::{BTreeMap, BTreeSet};
6
7use itertools::Itertools;
8use proc_macro2::Span;
9use quote::ToTokens;
10use syn::spanned::Spanned;
11use syn::{Error, Ident, ItemUse};
12
13use super::ops::next_iteration::NEXT_ITERATION;
14use super::ops::{FloType, Persistence};
15use super::{DfirGraph, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId, PortIndexValue};
16use crate::diagnostic::{Diagnostic, Level};
17use crate::graph::graph_algorithms;
18use crate::graph::ops::{PortListSpec, RangeTrait};
19use crate::parse::{DfirCode, DfirStatement, Operator, Pipeline};
20use crate::pretty_span::PrettySpan;
21
22#[derive(Clone, Debug)]
23struct Ends {
24    inn: Option<(PortIndexValue, GraphDet)>,
25    out: Option<(PortIndexValue, GraphDet)>,
26}
27
28#[derive(Clone, Debug)]
29enum GraphDet {
30    Determined(GraphNodeId),
31    Undetermined(Ident),
32}
33
34/// Variable name info for each ident, see [`FlatGraphBuilder::varname_ends`].
35#[derive(Debug)]
36struct VarnameInfo {
37    /// What the variable name resolves to.
38    pub ends: Ends,
39    /// Set to true if the varname reference creates an illegal self-referential cycle.
40    pub illegal_cycle: bool,
41    /// Set to true once the in port is used. Used to track unused ports.
42    pub inn_used: bool,
43    /// Set to true once the out port is used. Used to track unused ports.
44    pub out_used: bool,
45}
46impl VarnameInfo {
47    pub fn new(ends: Ends) -> Self {
48        Self {
49            ends,
50            illegal_cycle: false,
51            inn_used: false,
52            out_used: false,
53        }
54    }
55}
56
57/// Wraper around [`DfirGraph`] to build a flat graph from AST code.
58#[derive(Debug, Default)]
59pub struct FlatGraphBuilder {
60    /// Spanned error/warning/etc diagnostics to emit.
61    diagnostics: Vec<Diagnostic>,
62
63    /// [`DfirGraph`] being built.
64    flat_graph: DfirGraph,
65    /// Variable names, used as [`HfStatement::Named`] are added.
66    varname_ends: BTreeMap<Ident, VarnameInfo>,
67    /// Each (out -> inn) link inputted.
68    links: Vec<Ends>,
69
70    /// Use statements.
71    uses: Vec<ItemUse>,
72
73    /// If the flat graph is being loaded as a module, then two initial ModuleBoundary nodes are inserted into the graph. One
74    /// for the input into the module and one for the output out of the module.
75    module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>,
76}
77
78impl FlatGraphBuilder {
79    /// Create a new empty graph builder.
80    pub fn new() -> Self {
81        Default::default()
82    }
83
84    /// Convert the DFIR code AST into a graph builder.
85    pub fn from_dfir(input: DfirCode) -> Self {
86        let mut builder = Self::default();
87        builder.add_dfir(input, None, None);
88        builder
89    }
90
91    /// Build into an unpartitioned [`DfirGraph`], returning a tuple of the graph and
92    /// any diagnostics.
93    ///
94    /// Even if there are errors, the graph will be returned (potentially in a invalid
95    /// state). Does not call `emit` on any diagnostics.
96    pub fn build(mut self) -> (DfirGraph, Vec<ItemUse>, Vec<Diagnostic>) {
97        self.finalize_connect_operator_links();
98        self.process_operator_errors();
99
100        (self.flat_graph, self.uses, self.diagnostics)
101    }
102
103    /// Adds all [`DfirStatement`]s within the [`DfirCode`] to this [`DfirGraph`].
104    ///
105    /// Optional configuration:
106    /// * In the given loop context `current_loop`.
107    /// * With the given operator tag `operator_tag`.
108    pub fn add_dfir(
109        &mut self,
110        dfir: DfirCode,
111        current_loop: Option<GraphLoopId>,
112        operator_tag: Option<&str>,
113    ) {
114        for stmt in dfir.statements {
115            self.add_statement_internal(stmt, current_loop, operator_tag);
116        }
117    }
118
119    /// Add a single [`DfirStatement`] line to this [`DfirGraph`] in the root context.
120    pub fn add_statement(&mut self, stmt: DfirStatement) {
121        self.add_statement_internal(stmt, None, None);
122    }
123
124    /// Add a single [`DfirStatement`] line to this [`DfirGraph`] with given configuration.
125    ///
126    /// Optional configuration:
127    /// * In the given loop context `current_loop`.
128    /// * With the given operator tag `operator_tag`.
129    fn add_statement_internal(
130        &mut self,
131        stmt: DfirStatement,
132        current_loop: Option<GraphLoopId>,
133        operator_tag: Option<&str>,
134    ) {
135        match stmt {
136            DfirStatement::Use(yuse) => {
137                self.uses.push(yuse);
138            }
139            DfirStatement::Named(named) => {
140                let stmt_span = named.span();
141                let ends = self.add_pipeline(
142                    named.pipeline,
143                    Some(&named.name),
144                    current_loop,
145                    operator_tag,
146                );
147                self.assign_varname_checked(named.name, stmt_span, ends);
148            }
149            DfirStatement::Pipeline(pipeline_stmt) => {
150                let ends =
151                    self.add_pipeline(pipeline_stmt.pipeline, None, current_loop, operator_tag);
152                Self::helper_check_unused_port(&mut self.diagnostics, &ends, true);
153                Self::helper_check_unused_port(&mut self.diagnostics, &ends, false);
154            }
155            DfirStatement::Loop(loop_statement) => {
156                let inner_loop = self.flat_graph.insert_loop(current_loop);
157                for stmt in loop_statement.statements {
158                    self.add_statement_internal(stmt, Some(inner_loop), operator_tag);
159                }
160            }
161        }
162    }
163
164    /// Programatically add an pipeline, optionally adding `pred_name` as a single predecessor and
165    /// assigning it all to `asgn_name`.
166    ///
167    /// In DFIR syntax, equivalent to [`Self::add_statement`] of (if all names are supplied):
168    /// ```text
169    /// #asgn_name = #pred_name -> #pipeline;
170    /// ```
171    ///
172    /// But with, optionally:
173    /// * A `current_loop` to put the operator in.
174    /// * An `operator_tag` to tag the operator with, for debugging/tracing.
175    pub fn append_assign_pipeline(
176        &mut self,
177        asgn_name: Option<&Ident>,
178        pred_name: Option<&Ident>,
179        pipeline: Pipeline,
180        current_loop: Option<GraphLoopId>,
181        operator_tag: Option<&str>,
182    ) {
183        let span = pipeline.span();
184        let mut ends = self.add_pipeline(pipeline, asgn_name, current_loop, operator_tag);
185
186        // Connect `pred_name` if supplied.
187        if let Some(pred_name) = pred_name {
188            if let Some(pred_varname_info) = self.varname_ends.get(pred_name) {
189                // Update ends for `asgn_name`.
190                ends = self.connect_ends(pred_varname_info.ends.clone(), ends);
191            } else {
192                self.diagnostics.push(Diagnostic::spanned(
193                    pred_name.span(),
194                    Level::Error,
195                    format!(
196                        "Cannot find referenced name `{}`; name was never assigned.",
197                        pred_name
198                    ),
199                ));
200            }
201        }
202
203        // Assign `asgn_name` if supplied.
204        if let Some(asgn_name) = asgn_name {
205            self.assign_varname_checked(asgn_name.clone(), span, ends);
206        }
207    }
208}
209
210/// Internal methods.
211impl FlatGraphBuilder {
212    /// Assign a variable name to a pipeline, checking for conflicts.
213    fn assign_varname_checked(&mut self, name: Ident, stmt_span: Span, ends: Ends) {
214        match self.varname_ends.entry(name) {
215            Entry::Vacant(vacant_entry) => {
216                vacant_entry.insert(VarnameInfo::new(ends));
217            }
218            Entry::Occupied(occupied_entry) => {
219                let prev_conflict = occupied_entry.key();
220                self.diagnostics.push(Diagnostic::spanned(
221                    prev_conflict.span(),
222                    Level::Error,
223                    format!(
224                        "Existing assignment to `{}` conflicts with later assignment: {} (1/2)",
225                        prev_conflict,
226                        PrettySpan(stmt_span),
227                    ),
228                ));
229                self.diagnostics.push(Diagnostic::spanned(
230                    stmt_span,
231                    Level::Error,
232                    format!(
233                        "Name assignment to `{}` conflicts with existing assignment: {} (2/2)",
234                        prev_conflict,
235                        PrettySpan(prev_conflict.span())
236                    ),
237                ));
238            }
239        }
240    }
241
242    /// Helper: Add a pipeline, i.e. `a -> b -> c`. Return the input and output [`Ends`] for it.
243    fn add_pipeline(
244        &mut self,
245        pipeline: Pipeline,
246        current_varname: Option<&Ident>,
247        current_loop: Option<GraphLoopId>,
248        operator_tag: Option<&str>,
249    ) -> Ends {
250        match pipeline {
251            Pipeline::Paren(ported_pipeline_paren) => {
252                let (inn_port, pipeline_paren, out_port) =
253                    PortIndexValue::from_ported(ported_pipeline_paren);
254                let og_ends = self.add_pipeline(
255                    *pipeline_paren.pipeline,
256                    current_varname,
257                    current_loop,
258                    operator_tag,
259                );
260                Self::helper_combine_ends(&mut self.diagnostics, og_ends, inn_port, out_port)
261            }
262            Pipeline::Name(pipeline_name) => {
263                let (inn_port, ident, out_port) = PortIndexValue::from_ported(pipeline_name);
264
265                // Mingwei: We could lookup non-forward references immediately, but easier to just
266                // have one consistent code path: `GraphDet::Undetermined`.
267                Ends {
268                    inn: Some((inn_port, GraphDet::Undetermined(ident.clone()))),
269                    out: Some((out_port, GraphDet::Undetermined(ident))),
270                }
271            }
272            Pipeline::ModuleBoundary(pipeline_name) => {
273                let Some((input_node, output_node)) = self.module_boundary_nodes else {
274                    self.diagnostics.push(
275                        Error::new(
276                            pipeline_name.span(),
277                            "`mod` is only usable inside of a module.",
278                        )
279                        .into(),
280                    );
281
282                    return Ends {
283                        inn: None,
284                        out: None,
285                    };
286                };
287
288                let (inn_port, _, out_port) = PortIndexValue::from_ported(pipeline_name);
289
290                Ends {
291                    inn: Some((inn_port, GraphDet::Determined(output_node))),
292                    out: Some((out_port, GraphDet::Determined(input_node))),
293                }
294            }
295            Pipeline::Link(pipeline_link) => {
296                // Add the nested LHS and RHS of this link.
297                let lhs_ends = self.add_pipeline(
298                    *pipeline_link.lhs,
299                    current_varname,
300                    current_loop,
301                    operator_tag,
302                );
303                let rhs_ends = self.add_pipeline(
304                    *pipeline_link.rhs,
305                    current_varname,
306                    current_loop,
307                    operator_tag,
308                );
309
310                self.connect_ends(lhs_ends, rhs_ends)
311            }
312            Pipeline::Operator(operator) => {
313                let op_span = Some(operator.span());
314                let (node_id, ends) =
315                    self.add_operator(current_varname, current_loop, operator, op_span);
316                if let Some(operator_tag) = operator_tag {
317                    self.flat_graph
318                        .set_operator_tag(node_id, operator_tag.to_owned());
319                }
320                ends
321            }
322        }
323    }
324
325    /// Connects two [`Ends`] together. Returns the outer [`Ends`] for the connection.
326    ///
327    /// Links the inner ends together by adding it to `self.links`.
328    fn connect_ends(&mut self, lhs_ends: Ends, rhs_ends: Ends) -> Ends {
329        // Outer (first and last) ends.
330        let outer_ends = Ends {
331            inn: lhs_ends.inn,
332            out: rhs_ends.out,
333        };
334        // Inner (link) ends.
335        let link_ends = Ends {
336            out: lhs_ends.out,
337            inn: rhs_ends.inn,
338        };
339        self.links.push(link_ends);
340        outer_ends
341    }
342
343    /// Adds an operator to the graph, returning its [`GraphNodeId`] the input and output [`Ends`] for it.
344    fn add_operator(
345        &mut self,
346        current_varname: Option<&Ident>,
347        current_loop: Option<GraphLoopId>,
348        operator: Operator,
349        op_span: Option<Span>,
350    ) -> (GraphNodeId, Ends) {
351        let node_id = self.flat_graph.insert_node(
352            GraphNode::Operator(operator),
353            current_varname.cloned(),
354            current_loop,
355        );
356        let ends = Ends {
357            inn: Some((
358                PortIndexValue::Elided(op_span),
359                GraphDet::Determined(node_id),
360            )),
361            out: Some((
362                PortIndexValue::Elided(op_span),
363                GraphDet::Determined(node_id),
364            )),
365        };
366        (node_id, ends)
367    }
368
369    /// Connects operator links as a final building step. Processes all the links stored in
370    /// `self.links` and actually puts them into the graph.
371    fn finalize_connect_operator_links(&mut self) {
372        // `->` edges
373        for Ends { out, inn } in std::mem::take(&mut self.links) {
374            let out_opt = self.helper_resolve_name(out, false);
375            let inn_opt = self.helper_resolve_name(inn, true);
376            // `None` already have errors in `self.diagnostics`.
377            if let (Some((out_port, out_node)), Some((inn_port, inn_node))) = (out_opt, inn_opt) {
378                let _ = self.finalize_connect_operators(out_port, out_node, inn_port, inn_node);
379            }
380        }
381
382        // Resolve the singleton references for each node.
383        for node_id in self.flat_graph.node_ids().collect::<Vec<_>>() {
384            if let GraphNode::Operator(operator) = self.flat_graph.node(node_id) {
385                let singletons_referenced = operator
386                    .singletons_referenced
387                    .clone()
388                    .into_iter()
389                    .map(|singleton_ref| {
390                        let port_det = self
391                            .varname_ends
392                            .get(&singleton_ref)
393                            .filter(|varname_info| !varname_info.illegal_cycle)
394                            .map(|varname_info| &varname_info.ends)
395                            .and_then(|ends| ends.out.as_ref())
396                            .cloned();
397                        if let Some((_port, node_id)) = self.helper_resolve_name(port_det, false) {
398                            Some(node_id)
399                        } else {
400                            self.diagnostics.push(Diagnostic::spanned(
401                                singleton_ref.span(),
402                                Level::Error,
403                                format!(
404                                    "Cannot find referenced name `{}`; name was never assigned.",
405                                    singleton_ref
406                                ),
407                            ));
408                            None
409                        }
410                    })
411                    .collect();
412
413                self.flat_graph
414                    .set_node_singleton_references(node_id, singletons_referenced);
415            }
416        }
417    }
418
419    /// Recursively resolve a variable name. For handling forward (and backward) name references
420    /// after all names have been assigned.
421    /// Returns `None` if the name is not resolvable, either because it was never assigned or
422    /// because it contains a self-referential cycle.
423    ///
424    /// `is_in` set to `true` means the _input_ side will be returned. `false` means the _output_ side will be returned.
425    fn helper_resolve_name(
426        &mut self,
427        mut port_det: Option<(PortIndexValue, GraphDet)>,
428        is_in: bool,
429    ) -> Option<(PortIndexValue, GraphNodeId)> {
430        const BACKUP_RECURSION_LIMIT: usize = 1024;
431
432        let mut names = Vec::new();
433        for _ in 0..BACKUP_RECURSION_LIMIT {
434            match port_det? {
435                (port, GraphDet::Determined(node_id)) => {
436                    return Some((port, node_id));
437                }
438                (port, GraphDet::Undetermined(ident)) => {
439                    let Some(varname_info) = self.varname_ends.get_mut(&ident) else {
440                        self.diagnostics.push(Diagnostic::spanned(
441                            ident.span(),
442                            Level::Error,
443                            format!("Cannot find name `{}`; name was never assigned.", ident),
444                        ));
445                        return None;
446                    };
447                    // Check for a self-referential cycle.
448                    let cycle_found = names.contains(&ident);
449                    if !cycle_found {
450                        names.push(ident);
451                    };
452                    if cycle_found || varname_info.illegal_cycle {
453                        let len = names.len();
454                        for (i, name) in names.into_iter().enumerate() {
455                            self.diagnostics.push(Diagnostic::spanned(
456                                name.span(),
457                                Level::Error,
458                                format!(
459                                    "Name `{}` forms or references an illegal self-referential cycle ({}/{}).",
460                                    name,
461                                    i + 1,
462                                    len
463                                ),
464                            ));
465                            // Set value as `Err(())` to trigger `name_ends_result.is_err()`
466                            // diagnostics above if the name is referenced in the future.
467                            self.varname_ends.get_mut(&name).unwrap().illegal_cycle = true;
468                        }
469                        return None;
470                    }
471
472                    // No self-cycle.
473                    let prev = if is_in {
474                        varname_info.inn_used = true;
475                        &varname_info.ends.inn
476                    } else {
477                        varname_info.out_used = true;
478                        &varname_info.ends.out
479                    };
480                    port_det = Self::helper_combine_end(
481                        &mut self.diagnostics,
482                        prev.clone(),
483                        port,
484                        if is_in { "input" } else { "output" },
485                    );
486                }
487            }
488        }
489        self.diagnostics.push(Diagnostic::spanned(
490            Span::call_site(),
491            Level::Error,
492            format!(
493                "Reached the recursion limit {} while resolving names. This is either a dfir bug or you have an absurdly long chain of names: `{}`.",
494                BACKUP_RECURSION_LIMIT,
495                names.iter().map(ToString::to_string).collect::<Vec<_>>().join("` -> `"),
496            )
497        ));
498        None
499    }
500
501    /// Connect two operators on the given port indexes.
502    fn finalize_connect_operators(
503        &mut self,
504        src_port: PortIndexValue,
505        src: GraphNodeId,
506        dst_port: PortIndexValue,
507        dst: GraphNodeId,
508    ) -> GraphEdgeId {
509        {
510            /// Helper to emit conflicts when a port is used twice.
511            fn emit_conflict(
512                inout: &str,
513                old: &PortIndexValue,
514                new: &PortIndexValue,
515                diagnostics: &mut Vec<Diagnostic>,
516            ) {
517                // TODO(mingwei): Use `MultiSpan` once `proc_macro2` supports it.
518                diagnostics.push(Diagnostic::spanned(
519                    old.span(),
520                    Level::Error,
521                    format!(
522                        "{} connection conflicts with below ({}) (1/2)",
523                        inout,
524                        PrettySpan(new.span()),
525                    ),
526                ));
527                diagnostics.push(Diagnostic::spanned(
528                    new.span(),
529                    Level::Error,
530                    format!(
531                        "{} connection conflicts with above ({}) (2/2)",
532                        inout,
533                        PrettySpan(old.span()),
534                    ),
535                ));
536            }
537
538            // Handle src's successor port conflicts:
539            if src_port.is_specified() {
540                for conflicting_port in self
541                    .flat_graph
542                    .node_successor_edges(src)
543                    .map(|edge_id| self.flat_graph.edge_ports(edge_id).0)
544                    .filter(|&port| port == &src_port)
545                {
546                    emit_conflict("Output", conflicting_port, &src_port, &mut self.diagnostics);
547                }
548            }
549
550            // Handle dst's predecessor port conflicts:
551            if dst_port.is_specified() {
552                for conflicting_port in self
553                    .flat_graph
554                    .node_predecessor_edges(dst)
555                    .map(|edge_id| self.flat_graph.edge_ports(edge_id).1)
556                    .filter(|&port| port == &dst_port)
557                {
558                    emit_conflict("Input", conflicting_port, &dst_port, &mut self.diagnostics);
559                }
560            }
561        }
562        self.flat_graph.insert_edge(src, src_port, dst, dst_port)
563    }
564
565    /// Process operators and emit operator errors.
566    fn process_operator_errors(&mut self) {
567        self.make_operator_instances();
568        self.check_operator_errors();
569        self.warn_unused_port_indexing();
570        self.check_loop_errors();
571    }
572
573    /// Make `OperatorInstance`s for each operator node.
574    fn make_operator_instances(&mut self) {
575        self.flat_graph
576            .insert_node_op_insts_all(&mut self.diagnostics);
577    }
578
579    /// Validates that operators have valid number of inputs, outputs, & arguments.
580    /// Adds errors (and warnings) to `self.diagnostics`.
581    fn check_operator_errors(&mut self) {
582        for (node_id, node) in self.flat_graph.nodes() {
583            match node {
584                GraphNode::Operator(operator) => {
585                    let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
586                        // Error already emitted by `insert_node_op_insts_all`.
587                        continue;
588                    };
589                    let op_constraints = op_inst.op_constraints;
590
591                    // Check number of args
592                    if op_constraints.num_args != operator.args.len() {
593                        self.diagnostics.push(Diagnostic::spanned(
594                            operator.span(),
595                            Level::Error,
596                            format!(
597                                "expected {} argument(s), found {}",
598                                op_constraints.num_args,
599                                operator.args.len()
600                            ),
601                        ));
602                    }
603
604                    // Check input/output (port) arity
605                    /// Returns true if an error was found.
606                    fn emit_arity_error(
607                        operator: &Operator,
608                        is_in: bool,
609                        is_hard: bool,
610                        degree: usize,
611                        range: &dyn RangeTrait<usize>,
612                        diagnostics: &mut Vec<Diagnostic>,
613                    ) -> bool {
614                        let op_name = &*operator.name_string();
615                        let message = format!(
616                            "`{}` {} have {} {}, actually has {}.",
617                            op_name,
618                            if is_hard { "must" } else { "should" },
619                            range.human_string(),
620                            if is_in { "input(s)" } else { "output(s)" },
621                            degree,
622                        );
623                        let out_of_range = !range.contains(&degree);
624                        if out_of_range {
625                            diagnostics.push(Diagnostic::spanned(
626                                operator.span(),
627                                if is_hard {
628                                    Level::Error
629                                } else {
630                                    Level::Warning
631                                },
632                                message,
633                            ));
634                        }
635                        out_of_range
636                    }
637
638                    let inn_degree = self.flat_graph.node_degree_in(node_id);
639                    let _ = emit_arity_error(
640                        operator,
641                        true,
642                        true,
643                        inn_degree,
644                        op_constraints.hard_range_inn,
645                        &mut self.diagnostics,
646                    ) || emit_arity_error(
647                        operator,
648                        true,
649                        false,
650                        inn_degree,
651                        op_constraints.soft_range_inn,
652                        &mut self.diagnostics,
653                    );
654
655                    let out_degree = self.flat_graph.node_degree_out(node_id);
656                    let _ = emit_arity_error(
657                        operator,
658                        false,
659                        true,
660                        out_degree,
661                        op_constraints.hard_range_out,
662                        &mut self.diagnostics,
663                    ) || emit_arity_error(
664                        operator,
665                        false,
666                        false,
667                        out_degree,
668                        op_constraints.soft_range_out,
669                        &mut self.diagnostics,
670                    );
671
672                    fn emit_port_error<'a>(
673                        operator_span: Span,
674                        expected_ports_fn: Option<fn() -> PortListSpec>,
675                        actual_ports_iter: impl Iterator<Item = &'a PortIndexValue>,
676                        input_output: &'static str,
677                        diagnostics: &mut Vec<Diagnostic>,
678                    ) {
679                        let Some(expected_ports_fn) = expected_ports_fn else {
680                            return;
681                        };
682                        let PortListSpec::Fixed(expected_ports) = (expected_ports_fn)() else {
683                            // Separate check inside of `demux` special case.
684                            return;
685                        };
686                        let expected_ports: Vec<_> = expected_ports.into_iter().collect();
687
688                        // Reject unexpected ports.
689                        let ports: BTreeSet<_> = actual_ports_iter
690                            // Use `inspect` before collecting into `BTreeSet` to ensure we get
691                            // both error messages on duplicated port names.
692                            .inspect(|actual_port_iv| {
693                                // For each actually used port `port_index_value`, check if it is expected.
694                                let is_expected = expected_ports.iter().any(|port_index| {
695                                    actual_port_iv == &&port_index.clone().into()
696                                });
697                                // If it is not expected, emit a diagnostic error.
698                                if !is_expected {
699                                    diagnostics.push(Diagnostic::spanned(
700                                        actual_port_iv.span(),
701                                        Level::Error,
702                                        format!(
703                                            "Unexpected {} port: {}. Expected one of: `{}`",
704                                            input_output,
705                                            actual_port_iv.as_error_message_string(),
706                                            Itertools::intersperse(
707                                                expected_ports
708                                                    .iter()
709                                                    .map(|port| Cow::Owned(
710                                                        port.to_token_stream().to_string()
711                                                    )),
712                                                Cow::Borrowed("`, `")
713                                            ).collect::<String>()
714                                        ),
715                                    ))
716                                }
717                            })
718                            .collect();
719
720                        // List missing expected ports.
721                        let missing: Vec<_> = expected_ports
722                            .into_iter()
723                            .filter_map(|expected_port| {
724                                let tokens = expected_port.to_token_stream();
725                                if !ports.contains(&&expected_port.into()) {
726                                    Some(tokens)
727                                } else {
728                                    None
729                                }
730                            })
731                            .collect();
732                        if !missing.is_empty() {
733                            diagnostics.push(Diagnostic::spanned(
734                                operator_span,
735                                Level::Error,
736                                format!(
737                                    "Missing expected {} port(s): `{}`.",
738                                    input_output,
739                                    Itertools::intersperse(
740                                        missing.into_iter().map(|port| Cow::Owned(
741                                            port.to_token_stream().to_string()
742                                        )),
743                                        Cow::Borrowed("`, `")
744                                    )
745                                    .collect::<String>()
746                                ),
747                            ));
748                        }
749                    }
750
751                    emit_port_error(
752                        operator.span(),
753                        op_constraints.ports_inn,
754                        self.flat_graph
755                            .node_predecessor_edges(node_id)
756                            .map(|edge_id| self.flat_graph.edge_ports(edge_id).1),
757                        "input",
758                        &mut self.diagnostics,
759                    );
760                    emit_port_error(
761                        operator.span(),
762                        op_constraints.ports_out,
763                        self.flat_graph
764                            .node_successor_edges(node_id)
765                            .map(|edge_id| self.flat_graph.edge_ports(edge_id).0),
766                        "output",
767                        &mut self.diagnostics,
768                    );
769
770                    // Check that singleton references actually reference *stateful* operators.
771                    {
772                        let singletons_resolved =
773                            self.flat_graph.node_singleton_references(node_id);
774                        for (singleton_node_id, singleton_ident) in singletons_resolved
775                            .iter()
776                            .zip_eq(&*operator.singletons_referenced)
777                        {
778                            let &Some(singleton_node_id) = singleton_node_id else {
779                                // Error already emitted by `connect_operator_links`, "Cannot find referenced name...".
780                                continue;
781                            };
782                            let Some(ref_op_inst) = self.flat_graph.node_op_inst(singleton_node_id)
783                            else {
784                                // Error already emitted by `insert_node_op_insts_all`.
785                                continue;
786                            };
787                            let ref_op_constraints = ref_op_inst.op_constraints;
788                            if !ref_op_constraints.has_singleton_output {
789                                self.diagnostics.push(Diagnostic::spanned(
790                                    singleton_ident.span(),
791                                    Level::Error,
792                                    format!(
793                                        "Cannot reference operator `{}`. Only operators with singleton state can be referenced.",
794                                        ref_op_constraints.name,
795                                    ),
796                                ));
797                            }
798                        }
799                    }
800                }
801                GraphNode::Handoff { .. } => todo!("Node::Handoff"),
802                GraphNode::ModuleBoundary { .. } => {
803                    // Module boundaries don't require any checking.
804                }
805            }
806        }
807    }
808
809    /// Warns about unused port indexing referenced in [`Self::varname_ends`].
810    /// https://github.com/hydro-project/hydro/issues/1108
811    fn warn_unused_port_indexing(&mut self) {
812        for (_ident, varname_info) in self.varname_ends.iter() {
813            if !varname_info.inn_used {
814                Self::helper_check_unused_port(&mut self.diagnostics, &varname_info.ends, true);
815            }
816            if !varname_info.out_used {
817                Self::helper_check_unused_port(&mut self.diagnostics, &varname_info.ends, false);
818            }
819        }
820    }
821
822    /// Emit a warning to `diagnostics` for an unused port (i.e. if the port is specified for
823    /// reason).
824    fn helper_check_unused_port(diagnostics: &mut Vec<Diagnostic>, ends: &Ends, is_in: bool) {
825        let port = if is_in { &ends.inn } else { &ends.out };
826        if let Some((port, _)) = port {
827            if port.is_specified() {
828                diagnostics.push(Diagnostic::spanned(
829                    port.span(),
830                    Level::Error,
831                    format!(
832                        "{} port index is unused. (Is the port on the correct side?)",
833                        if is_in { "Input" } else { "Output" },
834                    ),
835                ));
836            }
837        }
838    }
839
840    /// Helper function.
841    /// Combine the port indexing information for indexing wrapped around a name.
842    /// Because the name may already have indexing, this may introduce double indexing (i.e. `[0][0]my_var[0][0]`)
843    /// which would be an error.
844    fn helper_combine_ends(
845        diagnostics: &mut Vec<Diagnostic>,
846        og_ends: Ends,
847        inn_port: PortIndexValue,
848        out_port: PortIndexValue,
849    ) -> Ends {
850        Ends {
851            inn: Self::helper_combine_end(diagnostics, og_ends.inn, inn_port, "input"),
852            out: Self::helper_combine_end(diagnostics, og_ends.out, out_port, "output"),
853        }
854    }
855
856    /// Helper function.
857    /// Combine the port indexing info for one input or output.
858    fn helper_combine_end(
859        diagnostics: &mut Vec<Diagnostic>,
860        og: Option<(PortIndexValue, GraphDet)>,
861        other: PortIndexValue,
862        input_output: &'static str,
863    ) -> Option<(PortIndexValue, GraphDet)> {
864        // TODO(mingwei): minification pass over this code?
865
866        let other_span = other.span();
867
868        let (og_port, og_node) = og?;
869        match og_port.combine(other) {
870            Ok(combined_port) => Some((combined_port, og_node)),
871            Err(og_port) => {
872                // TODO(mingwei): Use `MultiSpan` once `proc_macro2` supports it.
873                diagnostics.push(Diagnostic::spanned(
874                    og_port.span(),
875                    Level::Error,
876                    format!(
877                        "Indexing on {} is overwritten below ({}) (1/2).",
878                        input_output,
879                        PrettySpan(other_span),
880                    ),
881                ));
882                diagnostics.push(Diagnostic::spanned(
883                    other_span,
884                    Level::Error,
885                    format!(
886                        "Cannot index on already-indexed {}, previously indexed above ({}) (2/2).",
887                        input_output,
888                        PrettySpan(og_port.span()),
889                    ),
890                ));
891                // When errored, just use original and ignore OTHER port to minimize
892                // noisy/extra diagnostics.
893                Some((og_port, og_node))
894            }
895        }
896    }
897
898    /// Check for loop context-related errors.
899    fn check_loop_errors(&mut self) {
900        for (node_id, node) in self.flat_graph.nodes() {
901            let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
902                continue;
903            };
904            let Some(_loop_id) = self.flat_graph.node_loop(node_id) else {
905                continue;
906            };
907
908            // All inputs must be declared in the root block.
909            if Some(FloType::Source) == op_inst.op_constraints.flo_type {
910                self.diagnostics.push(Diagnostic::spanned(
911                    node.span(),
912                    Level::Error,
913                    format!(
914                        "Source operator `{}(...)` must be at the root level, not within any `loop {{ ... }}` contexts.",
915                        op_inst.op_constraints.name
916                    )
917                ));
918            }
919
920            // Ensure no `'tick` or `'static` persistences are used in a loop context.
921            for persistence in &op_inst.generics.persistence_args {
922                if let Persistence::Tick | Persistence::Static = persistence {
923                    self.diagnostics.push(Diagnostic::spanned(
924                        op_inst.generics.generic_args.span(),
925                        Level::Error,
926                        "Operator uses a `'tick` or `'static` persistence, which is not allowed within a `loop {{ ... }}` context."
927                    ));
928                }
929            }
930        }
931
932        // Check windowing and un-windowing operators, for loop inputs and outputs respectively.
933        for (_edge_id, (pred_id, node_id)) in self.flat_graph.edges() {
934            let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
935                continue;
936            };
937            let flo_type = &op_inst.op_constraints.flo_type;
938
939            let pred_loop_id = self.flat_graph.node_loop(pred_id);
940            let loop_id = self.flat_graph.node_loop(node_id);
941
942            let span = self.flat_graph.node(node_id).span();
943
944            let (is_input, is_output) = {
945                let parent_pred_loop_id =
946                    pred_loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
947                let parent_loop_id = loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
948                let is_same = pred_loop_id == loop_id;
949                let is_input = !is_same && parent_loop_id == pred_loop_id;
950                let is_output = !is_same && parent_pred_loop_id == loop_id;
951                if !(is_input || is_output || is_same) {
952                    self.diagnostics.push(Diagnostic::spanned(
953                        span,
954                        Level::Error,
955                        "Operator input edge may not cross multiple loop contexts.",
956                    ));
957                    continue;
958                }
959                (is_input, is_output)
960            };
961
962            match flo_type {
963                None => {
964                    if is_input {
965                        self.diagnostics.push(Diagnostic::spanned(
966                            span,
967                            Level::Error,
968                            format!(
969                                "Operator `{}(...)` entering a loop context must be a windowing operator, but is not.",
970                                op_inst.op_constraints.name
971                            )
972                        ));
973                    }
974                    if is_output {
975                        self.diagnostics.push(Diagnostic::spanned(
976                            span,
977                            Level::Error,
978                            format!(
979                                "Operator `{}(...)` exiting a loop context must be an un-windowing operator, but is not.",
980                                op_inst.op_constraints.name
981                            )
982                        ));
983                    }
984                }
985                Some(FloType::Windowing) => {
986                    if !is_input {
987                        self.diagnostics.push(Diagnostic::spanned(
988                            span,
989                            Level::Error,
990                            format!(
991                                "Windowing operator `{}(...)` must be the first input operator into a `loop {{ ... }} context.",
992                                op_inst.op_constraints.name
993                            )
994                        ));
995                    }
996                }
997                Some(FloType::Unwindowing) => {
998                    if !is_output {
999                        self.diagnostics.push(Diagnostic::spanned(
1000                            span,
1001                            Level::Error,
1002                            format!(
1003                                "Un-windowing operator `{}(...)` must be the first output operator after exiting a `loop {{ ... }} context.",
1004                                op_inst.op_constraints.name
1005                            )
1006                        ));
1007                    }
1008                }
1009                Some(FloType::NextIteration) => {
1010                    // Must be in a loop context.
1011                    if loop_id.is_none() {
1012                        self.diagnostics.push(Diagnostic::spanned(
1013                            span,
1014                            Level::Error,
1015                            format!(
1016                                "Operator `{}(...)` must be within a `loop {{ ... }}` context.",
1017                                op_inst.op_constraints.name
1018                            ),
1019                        ));
1020                    }
1021                }
1022                Some(FloType::Source) => {
1023                    // Handled above.
1024                }
1025            }
1026        }
1027
1028        // Must be a DAG (excluding `next_iteration()` operators).
1029        // TODO(mingwei): Nested loop blocks should count as a single node.
1030        // But this doesn't cause any correctness issues because the nested loops are also DAGs.
1031        for (loop_id, loop_nodes) in self.flat_graph.loops() {
1032            // Filter out `next_iteration()` operators.
1033            let filter_next_iteration = |&node_id: &GraphNodeId| {
1034                self.flat_graph
1035                    .node_op_inst(node_id)
1036                    .map(|op_inst| Some(FloType::NextIteration) != op_inst.op_constraints.flo_type)
1037                    .unwrap_or(true)
1038            };
1039
1040            let topo_sort_result = graph_algorithms::topo_sort(
1041                loop_nodes.iter().copied().filter(filter_next_iteration),
1042                |dst| {
1043                    self.flat_graph
1044                        .node_predecessor_nodes(dst)
1045                        .filter(|&src| Some(loop_id) == self.flat_graph.node_loop(src))
1046                        .filter(filter_next_iteration)
1047                },
1048            );
1049            if let Err(cycle) = topo_sort_result {
1050                let len = cycle.len();
1051                for (i, node_id) in cycle.into_iter().enumerate() {
1052                    let span = self.flat_graph.node(node_id).span();
1053                    self.diagnostics.push(Diagnostic::spanned(
1054                        span,
1055                        Level::Error,
1056                        format!(
1057                            "Operator forms an illegal cycle within a `loop {{ ... }}` block. Use `{}()` to pass data across loop iterations. ({}/{})",
1058                            NEXT_ITERATION.name,
1059                            i + 1,
1060                            len,
1061                        ),
1062                    ));
1063                }
1064            }
1065        }
1066    }
1067}