1use std::borrow::Cow;
4use std::collections::btree_map::Entry;
5use std::collections::{BTreeMap, BTreeSet};
6
7use itertools::Itertools;
8use proc_macro2::Span;
9use quote::ToTokens;
10use syn::spanned::Spanned;
11use syn::{Error, Ident, ItemUse};
12
13use super::ops::next_iteration::NEXT_ITERATION;
14use super::ops::{FloType, Persistence};
15use super::{DfirGraph, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId, PortIndexValue};
16use crate::diagnostic::{Diagnostic, Level};
17use crate::graph::graph_algorithms;
18use crate::graph::ops::{PortListSpec, RangeTrait};
19use crate::parse::{DfirCode, DfirStatement, Operator, Pipeline};
20use crate::pretty_span::PrettySpan;
21
22#[derive(Clone, Debug)]
23struct Ends {
24 inn: Option<(PortIndexValue, GraphDet)>,
25 out: Option<(PortIndexValue, GraphDet)>,
26}
27
28#[derive(Clone, Debug)]
29enum GraphDet {
30 Determined(GraphNodeId),
31 Undetermined(Ident),
32}
33
34#[derive(Debug)]
36struct VarnameInfo {
37 pub ends: Ends,
39 pub illegal_cycle: bool,
41 pub inn_used: bool,
43 pub out_used: bool,
45}
46impl VarnameInfo {
47 pub fn new(ends: Ends) -> Self {
48 Self {
49 ends,
50 illegal_cycle: false,
51 inn_used: false,
52 out_used: false,
53 }
54 }
55}
56
57#[derive(Debug, Default)]
59pub struct FlatGraphBuilder {
60 diagnostics: Vec<Diagnostic>,
62
63 flat_graph: DfirGraph,
65 varname_ends: BTreeMap<Ident, VarnameInfo>,
67 links: Vec<Ends>,
69
70 uses: Vec<ItemUse>,
72
73 module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>,
76}
77
78impl FlatGraphBuilder {
79 pub fn new() -> Self {
81 Default::default()
82 }
83
84 pub fn from_dfir(input: DfirCode) -> Self {
86 let mut builder = Self::default();
87 builder.add_dfir(input, None, None);
88 builder
89 }
90
91 pub fn build(mut self) -> (DfirGraph, Vec<ItemUse>, Vec<Diagnostic>) {
97 self.finalize_connect_operator_links();
98 self.process_operator_errors();
99
100 (self.flat_graph, self.uses, self.diagnostics)
101 }
102
103 pub fn add_dfir(
109 &mut self,
110 dfir: DfirCode,
111 current_loop: Option<GraphLoopId>,
112 operator_tag: Option<&str>,
113 ) {
114 for stmt in dfir.statements {
115 self.add_statement_internal(stmt, current_loop, operator_tag);
116 }
117 }
118
119 pub fn add_statement(&mut self, stmt: DfirStatement) {
121 self.add_statement_internal(stmt, None, None);
122 }
123
124 fn add_statement_internal(
130 &mut self,
131 stmt: DfirStatement,
132 current_loop: Option<GraphLoopId>,
133 operator_tag: Option<&str>,
134 ) {
135 match stmt {
136 DfirStatement::Use(yuse) => {
137 self.uses.push(yuse);
138 }
139 DfirStatement::Named(named) => {
140 let stmt_span = named.span();
141 let ends = self.add_pipeline(
142 named.pipeline,
143 Some(&named.name),
144 current_loop,
145 operator_tag,
146 );
147 self.assign_varname_checked(named.name, stmt_span, ends);
148 }
149 DfirStatement::Pipeline(pipeline_stmt) => {
150 let ends =
151 self.add_pipeline(pipeline_stmt.pipeline, None, current_loop, operator_tag);
152 Self::helper_check_unused_port(&mut self.diagnostics, &ends, true);
153 Self::helper_check_unused_port(&mut self.diagnostics, &ends, false);
154 }
155 DfirStatement::Loop(loop_statement) => {
156 let inner_loop = self.flat_graph.insert_loop(current_loop);
157 for stmt in loop_statement.statements {
158 self.add_statement_internal(stmt, Some(inner_loop), operator_tag);
159 }
160 }
161 }
162 }
163
164 pub fn append_assign_pipeline(
176 &mut self,
177 asgn_name: Option<&Ident>,
178 pred_name: Option<&Ident>,
179 pipeline: Pipeline,
180 current_loop: Option<GraphLoopId>,
181 operator_tag: Option<&str>,
182 ) {
183 let span = pipeline.span();
184 let mut ends = self.add_pipeline(pipeline, asgn_name, current_loop, operator_tag);
185
186 if let Some(pred_name) = pred_name {
188 if let Some(pred_varname_info) = self.varname_ends.get(pred_name) {
189 ends = self.connect_ends(pred_varname_info.ends.clone(), ends);
191 } else {
192 self.diagnostics.push(Diagnostic::spanned(
193 pred_name.span(),
194 Level::Error,
195 format!(
196 "Cannot find referenced name `{}`; name was never assigned.",
197 pred_name
198 ),
199 ));
200 }
201 }
202
203 if let Some(asgn_name) = asgn_name {
205 self.assign_varname_checked(asgn_name.clone(), span, ends);
206 }
207 }
208}
209
210impl FlatGraphBuilder {
212 fn assign_varname_checked(&mut self, name: Ident, stmt_span: Span, ends: Ends) {
214 match self.varname_ends.entry(name) {
215 Entry::Vacant(vacant_entry) => {
216 vacant_entry.insert(VarnameInfo::new(ends));
217 }
218 Entry::Occupied(occupied_entry) => {
219 let prev_conflict = occupied_entry.key();
220 self.diagnostics.push(Diagnostic::spanned(
221 prev_conflict.span(),
222 Level::Error,
223 format!(
224 "Existing assignment to `{}` conflicts with later assignment: {} (1/2)",
225 prev_conflict,
226 PrettySpan(stmt_span),
227 ),
228 ));
229 self.diagnostics.push(Diagnostic::spanned(
230 stmt_span,
231 Level::Error,
232 format!(
233 "Name assignment to `{}` conflicts with existing assignment: {} (2/2)",
234 prev_conflict,
235 PrettySpan(prev_conflict.span())
236 ),
237 ));
238 }
239 }
240 }
241
242 fn add_pipeline(
244 &mut self,
245 pipeline: Pipeline,
246 current_varname: Option<&Ident>,
247 current_loop: Option<GraphLoopId>,
248 operator_tag: Option<&str>,
249 ) -> Ends {
250 match pipeline {
251 Pipeline::Paren(ported_pipeline_paren) => {
252 let (inn_port, pipeline_paren, out_port) =
253 PortIndexValue::from_ported(ported_pipeline_paren);
254 let og_ends = self.add_pipeline(
255 *pipeline_paren.pipeline,
256 current_varname,
257 current_loop,
258 operator_tag,
259 );
260 Self::helper_combine_ends(&mut self.diagnostics, og_ends, inn_port, out_port)
261 }
262 Pipeline::Name(pipeline_name) => {
263 let (inn_port, ident, out_port) = PortIndexValue::from_ported(pipeline_name);
264
265 Ends {
268 inn: Some((inn_port, GraphDet::Undetermined(ident.clone()))),
269 out: Some((out_port, GraphDet::Undetermined(ident))),
270 }
271 }
272 Pipeline::ModuleBoundary(pipeline_name) => {
273 let Some((input_node, output_node)) = self.module_boundary_nodes else {
274 self.diagnostics.push(
275 Error::new(
276 pipeline_name.span(),
277 "`mod` is only usable inside of a module.",
278 )
279 .into(),
280 );
281
282 return Ends {
283 inn: None,
284 out: None,
285 };
286 };
287
288 let (inn_port, _, out_port) = PortIndexValue::from_ported(pipeline_name);
289
290 Ends {
291 inn: Some((inn_port, GraphDet::Determined(output_node))),
292 out: Some((out_port, GraphDet::Determined(input_node))),
293 }
294 }
295 Pipeline::Link(pipeline_link) => {
296 let lhs_ends = self.add_pipeline(
298 *pipeline_link.lhs,
299 current_varname,
300 current_loop,
301 operator_tag,
302 );
303 let rhs_ends = self.add_pipeline(
304 *pipeline_link.rhs,
305 current_varname,
306 current_loop,
307 operator_tag,
308 );
309
310 self.connect_ends(lhs_ends, rhs_ends)
311 }
312 Pipeline::Operator(operator) => {
313 let op_span = Some(operator.span());
314 let (node_id, ends) =
315 self.add_operator(current_varname, current_loop, operator, op_span);
316 if let Some(operator_tag) = operator_tag {
317 self.flat_graph
318 .set_operator_tag(node_id, operator_tag.to_owned());
319 }
320 ends
321 }
322 }
323 }
324
325 fn connect_ends(&mut self, lhs_ends: Ends, rhs_ends: Ends) -> Ends {
329 let outer_ends = Ends {
331 inn: lhs_ends.inn,
332 out: rhs_ends.out,
333 };
334 let link_ends = Ends {
336 out: lhs_ends.out,
337 inn: rhs_ends.inn,
338 };
339 self.links.push(link_ends);
340 outer_ends
341 }
342
343 fn add_operator(
345 &mut self,
346 current_varname: Option<&Ident>,
347 current_loop: Option<GraphLoopId>,
348 operator: Operator,
349 op_span: Option<Span>,
350 ) -> (GraphNodeId, Ends) {
351 let node_id = self.flat_graph.insert_node(
352 GraphNode::Operator(operator),
353 current_varname.cloned(),
354 current_loop,
355 );
356 let ends = Ends {
357 inn: Some((
358 PortIndexValue::Elided(op_span),
359 GraphDet::Determined(node_id),
360 )),
361 out: Some((
362 PortIndexValue::Elided(op_span),
363 GraphDet::Determined(node_id),
364 )),
365 };
366 (node_id, ends)
367 }
368
369 fn finalize_connect_operator_links(&mut self) {
372 for Ends { out, inn } in std::mem::take(&mut self.links) {
374 let out_opt = self.helper_resolve_name(out, false);
375 let inn_opt = self.helper_resolve_name(inn, true);
376 if let (Some((out_port, out_node)), Some((inn_port, inn_node))) = (out_opt, inn_opt) {
378 let _ = self.finalize_connect_operators(out_port, out_node, inn_port, inn_node);
379 }
380 }
381
382 for node_id in self.flat_graph.node_ids().collect::<Vec<_>>() {
384 if let GraphNode::Operator(operator) = self.flat_graph.node(node_id) {
385 let singletons_referenced = operator
386 .singletons_referenced
387 .clone()
388 .into_iter()
389 .map(|singleton_ref| {
390 let port_det = self
391 .varname_ends
392 .get(&singleton_ref)
393 .filter(|varname_info| !varname_info.illegal_cycle)
394 .map(|varname_info| &varname_info.ends)
395 .and_then(|ends| ends.out.as_ref())
396 .cloned();
397 if let Some((_port, node_id)) = self.helper_resolve_name(port_det, false) {
398 Some(node_id)
399 } else {
400 self.diagnostics.push(Diagnostic::spanned(
401 singleton_ref.span(),
402 Level::Error,
403 format!(
404 "Cannot find referenced name `{}`; name was never assigned.",
405 singleton_ref
406 ),
407 ));
408 None
409 }
410 })
411 .collect();
412
413 self.flat_graph
414 .set_node_singleton_references(node_id, singletons_referenced);
415 }
416 }
417 }
418
419 fn helper_resolve_name(
426 &mut self,
427 mut port_det: Option<(PortIndexValue, GraphDet)>,
428 is_in: bool,
429 ) -> Option<(PortIndexValue, GraphNodeId)> {
430 const BACKUP_RECURSION_LIMIT: usize = 1024;
431
432 let mut names = Vec::new();
433 for _ in 0..BACKUP_RECURSION_LIMIT {
434 match port_det? {
435 (port, GraphDet::Determined(node_id)) => {
436 return Some((port, node_id));
437 }
438 (port, GraphDet::Undetermined(ident)) => {
439 let Some(varname_info) = self.varname_ends.get_mut(&ident) else {
440 self.diagnostics.push(Diagnostic::spanned(
441 ident.span(),
442 Level::Error,
443 format!("Cannot find name `{}`; name was never assigned.", ident),
444 ));
445 return None;
446 };
447 let cycle_found = names.contains(&ident);
449 if !cycle_found {
450 names.push(ident);
451 };
452 if cycle_found || varname_info.illegal_cycle {
453 let len = names.len();
454 for (i, name) in names.into_iter().enumerate() {
455 self.diagnostics.push(Diagnostic::spanned(
456 name.span(),
457 Level::Error,
458 format!(
459 "Name `{}` forms or references an illegal self-referential cycle ({}/{}).",
460 name,
461 i + 1,
462 len
463 ),
464 ));
465 self.varname_ends.get_mut(&name).unwrap().illegal_cycle = true;
468 }
469 return None;
470 }
471
472 let prev = if is_in {
474 varname_info.inn_used = true;
475 &varname_info.ends.inn
476 } else {
477 varname_info.out_used = true;
478 &varname_info.ends.out
479 };
480 port_det = Self::helper_combine_end(
481 &mut self.diagnostics,
482 prev.clone(),
483 port,
484 if is_in { "input" } else { "output" },
485 );
486 }
487 }
488 }
489 self.diagnostics.push(Diagnostic::spanned(
490 Span::call_site(),
491 Level::Error,
492 format!(
493 "Reached the recursion limit {} while resolving names. This is either a dfir bug or you have an absurdly long chain of names: `{}`.",
494 BACKUP_RECURSION_LIMIT,
495 names.iter().map(ToString::to_string).collect::<Vec<_>>().join("` -> `"),
496 )
497 ));
498 None
499 }
500
501 fn finalize_connect_operators(
503 &mut self,
504 src_port: PortIndexValue,
505 src: GraphNodeId,
506 dst_port: PortIndexValue,
507 dst: GraphNodeId,
508 ) -> GraphEdgeId {
509 {
510 fn emit_conflict(
512 inout: &str,
513 old: &PortIndexValue,
514 new: &PortIndexValue,
515 diagnostics: &mut Vec<Diagnostic>,
516 ) {
517 diagnostics.push(Diagnostic::spanned(
519 old.span(),
520 Level::Error,
521 format!(
522 "{} connection conflicts with below ({}) (1/2)",
523 inout,
524 PrettySpan(new.span()),
525 ),
526 ));
527 diagnostics.push(Diagnostic::spanned(
528 new.span(),
529 Level::Error,
530 format!(
531 "{} connection conflicts with above ({}) (2/2)",
532 inout,
533 PrettySpan(old.span()),
534 ),
535 ));
536 }
537
538 if src_port.is_specified() {
540 for conflicting_port in self
541 .flat_graph
542 .node_successor_edges(src)
543 .map(|edge_id| self.flat_graph.edge_ports(edge_id).0)
544 .filter(|&port| port == &src_port)
545 {
546 emit_conflict("Output", conflicting_port, &src_port, &mut self.diagnostics);
547 }
548 }
549
550 if dst_port.is_specified() {
552 for conflicting_port in self
553 .flat_graph
554 .node_predecessor_edges(dst)
555 .map(|edge_id| self.flat_graph.edge_ports(edge_id).1)
556 .filter(|&port| port == &dst_port)
557 {
558 emit_conflict("Input", conflicting_port, &dst_port, &mut self.diagnostics);
559 }
560 }
561 }
562 self.flat_graph.insert_edge(src, src_port, dst, dst_port)
563 }
564
565 fn process_operator_errors(&mut self) {
567 self.make_operator_instances();
568 self.check_operator_errors();
569 self.warn_unused_port_indexing();
570 self.check_loop_errors();
571 }
572
573 fn make_operator_instances(&mut self) {
575 self.flat_graph
576 .insert_node_op_insts_all(&mut self.diagnostics);
577 }
578
579 fn check_operator_errors(&mut self) {
582 for (node_id, node) in self.flat_graph.nodes() {
583 match node {
584 GraphNode::Operator(operator) => {
585 let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
586 continue;
588 };
589 let op_constraints = op_inst.op_constraints;
590
591 if op_constraints.num_args != operator.args.len() {
593 self.diagnostics.push(Diagnostic::spanned(
594 operator.span(),
595 Level::Error,
596 format!(
597 "expected {} argument(s), found {}",
598 op_constraints.num_args,
599 operator.args.len()
600 ),
601 ));
602 }
603
604 fn emit_arity_error(
607 operator: &Operator,
608 is_in: bool,
609 is_hard: bool,
610 degree: usize,
611 range: &dyn RangeTrait<usize>,
612 diagnostics: &mut Vec<Diagnostic>,
613 ) -> bool {
614 let op_name = &*operator.name_string();
615 let message = format!(
616 "`{}` {} have {} {}, actually has {}.",
617 op_name,
618 if is_hard { "must" } else { "should" },
619 range.human_string(),
620 if is_in { "input(s)" } else { "output(s)" },
621 degree,
622 );
623 let out_of_range = !range.contains(°ree);
624 if out_of_range {
625 diagnostics.push(Diagnostic::spanned(
626 operator.span(),
627 if is_hard {
628 Level::Error
629 } else {
630 Level::Warning
631 },
632 message,
633 ));
634 }
635 out_of_range
636 }
637
638 let inn_degree = self.flat_graph.node_degree_in(node_id);
639 let _ = emit_arity_error(
640 operator,
641 true,
642 true,
643 inn_degree,
644 op_constraints.hard_range_inn,
645 &mut self.diagnostics,
646 ) || emit_arity_error(
647 operator,
648 true,
649 false,
650 inn_degree,
651 op_constraints.soft_range_inn,
652 &mut self.diagnostics,
653 );
654
655 let out_degree = self.flat_graph.node_degree_out(node_id);
656 let _ = emit_arity_error(
657 operator,
658 false,
659 true,
660 out_degree,
661 op_constraints.hard_range_out,
662 &mut self.diagnostics,
663 ) || emit_arity_error(
664 operator,
665 false,
666 false,
667 out_degree,
668 op_constraints.soft_range_out,
669 &mut self.diagnostics,
670 );
671
672 fn emit_port_error<'a>(
673 operator_span: Span,
674 expected_ports_fn: Option<fn() -> PortListSpec>,
675 actual_ports_iter: impl Iterator<Item = &'a PortIndexValue>,
676 input_output: &'static str,
677 diagnostics: &mut Vec<Diagnostic>,
678 ) {
679 let Some(expected_ports_fn) = expected_ports_fn else {
680 return;
681 };
682 let PortListSpec::Fixed(expected_ports) = (expected_ports_fn)() else {
683 return;
685 };
686 let expected_ports: Vec<_> = expected_ports.into_iter().collect();
687
688 let ports: BTreeSet<_> = actual_ports_iter
690 .inspect(|actual_port_iv| {
693 let is_expected = expected_ports.iter().any(|port_index| {
695 actual_port_iv == &&port_index.clone().into()
696 });
697 if !is_expected {
699 diagnostics.push(Diagnostic::spanned(
700 actual_port_iv.span(),
701 Level::Error,
702 format!(
703 "Unexpected {} port: {}. Expected one of: `{}`",
704 input_output,
705 actual_port_iv.as_error_message_string(),
706 Itertools::intersperse(
707 expected_ports
708 .iter()
709 .map(|port| Cow::Owned(
710 port.to_token_stream().to_string()
711 )),
712 Cow::Borrowed("`, `")
713 ).collect::<String>()
714 ),
715 ))
716 }
717 })
718 .collect();
719
720 let missing: Vec<_> = expected_ports
722 .into_iter()
723 .filter_map(|expected_port| {
724 let tokens = expected_port.to_token_stream();
725 if !ports.contains(&&expected_port.into()) {
726 Some(tokens)
727 } else {
728 None
729 }
730 })
731 .collect();
732 if !missing.is_empty() {
733 diagnostics.push(Diagnostic::spanned(
734 operator_span,
735 Level::Error,
736 format!(
737 "Missing expected {} port(s): `{}`.",
738 input_output,
739 Itertools::intersperse(
740 missing.into_iter().map(|port| Cow::Owned(
741 port.to_token_stream().to_string()
742 )),
743 Cow::Borrowed("`, `")
744 )
745 .collect::<String>()
746 ),
747 ));
748 }
749 }
750
751 emit_port_error(
752 operator.span(),
753 op_constraints.ports_inn,
754 self.flat_graph
755 .node_predecessor_edges(node_id)
756 .map(|edge_id| self.flat_graph.edge_ports(edge_id).1),
757 "input",
758 &mut self.diagnostics,
759 );
760 emit_port_error(
761 operator.span(),
762 op_constraints.ports_out,
763 self.flat_graph
764 .node_successor_edges(node_id)
765 .map(|edge_id| self.flat_graph.edge_ports(edge_id).0),
766 "output",
767 &mut self.diagnostics,
768 );
769
770 {
772 let singletons_resolved =
773 self.flat_graph.node_singleton_references(node_id);
774 for (singleton_node_id, singleton_ident) in singletons_resolved
775 .iter()
776 .zip_eq(&*operator.singletons_referenced)
777 {
778 let &Some(singleton_node_id) = singleton_node_id else {
779 continue;
781 };
782 let Some(ref_op_inst) = self.flat_graph.node_op_inst(singleton_node_id)
783 else {
784 continue;
786 };
787 let ref_op_constraints = ref_op_inst.op_constraints;
788 if !ref_op_constraints.has_singleton_output {
789 self.diagnostics.push(Diagnostic::spanned(
790 singleton_ident.span(),
791 Level::Error,
792 format!(
793 "Cannot reference operator `{}`. Only operators with singleton state can be referenced.",
794 ref_op_constraints.name,
795 ),
796 ));
797 }
798 }
799 }
800 }
801 GraphNode::Handoff { .. } => todo!("Node::Handoff"),
802 GraphNode::ModuleBoundary { .. } => {
803 }
805 }
806 }
807 }
808
809 fn warn_unused_port_indexing(&mut self) {
812 for (_ident, varname_info) in self.varname_ends.iter() {
813 if !varname_info.inn_used {
814 Self::helper_check_unused_port(&mut self.diagnostics, &varname_info.ends, true);
815 }
816 if !varname_info.out_used {
817 Self::helper_check_unused_port(&mut self.diagnostics, &varname_info.ends, false);
818 }
819 }
820 }
821
822 fn helper_check_unused_port(diagnostics: &mut Vec<Diagnostic>, ends: &Ends, is_in: bool) {
825 let port = if is_in { &ends.inn } else { &ends.out };
826 if let Some((port, _)) = port {
827 if port.is_specified() {
828 diagnostics.push(Diagnostic::spanned(
829 port.span(),
830 Level::Error,
831 format!(
832 "{} port index is unused. (Is the port on the correct side?)",
833 if is_in { "Input" } else { "Output" },
834 ),
835 ));
836 }
837 }
838 }
839
840 fn helper_combine_ends(
845 diagnostics: &mut Vec<Diagnostic>,
846 og_ends: Ends,
847 inn_port: PortIndexValue,
848 out_port: PortIndexValue,
849 ) -> Ends {
850 Ends {
851 inn: Self::helper_combine_end(diagnostics, og_ends.inn, inn_port, "input"),
852 out: Self::helper_combine_end(diagnostics, og_ends.out, out_port, "output"),
853 }
854 }
855
856 fn helper_combine_end(
859 diagnostics: &mut Vec<Diagnostic>,
860 og: Option<(PortIndexValue, GraphDet)>,
861 other: PortIndexValue,
862 input_output: &'static str,
863 ) -> Option<(PortIndexValue, GraphDet)> {
864 let other_span = other.span();
867
868 let (og_port, og_node) = og?;
869 match og_port.combine(other) {
870 Ok(combined_port) => Some((combined_port, og_node)),
871 Err(og_port) => {
872 diagnostics.push(Diagnostic::spanned(
874 og_port.span(),
875 Level::Error,
876 format!(
877 "Indexing on {} is overwritten below ({}) (1/2).",
878 input_output,
879 PrettySpan(other_span),
880 ),
881 ));
882 diagnostics.push(Diagnostic::spanned(
883 other_span,
884 Level::Error,
885 format!(
886 "Cannot index on already-indexed {}, previously indexed above ({}) (2/2).",
887 input_output,
888 PrettySpan(og_port.span()),
889 ),
890 ));
891 Some((og_port, og_node))
894 }
895 }
896 }
897
898 fn check_loop_errors(&mut self) {
900 for (node_id, node) in self.flat_graph.nodes() {
901 let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
902 continue;
903 };
904 let Some(_loop_id) = self.flat_graph.node_loop(node_id) else {
905 continue;
906 };
907
908 if Some(FloType::Source) == op_inst.op_constraints.flo_type {
910 self.diagnostics.push(Diagnostic::spanned(
911 node.span(),
912 Level::Error,
913 format!(
914 "Source operator `{}(...)` must be at the root level, not within any `loop {{ ... }}` contexts.",
915 op_inst.op_constraints.name
916 )
917 ));
918 }
919
920 for persistence in &op_inst.generics.persistence_args {
922 if let Persistence::Tick | Persistence::Static = persistence {
923 self.diagnostics.push(Diagnostic::spanned(
924 op_inst.generics.generic_args.span(),
925 Level::Error,
926 "Operator uses a `'tick` or `'static` persistence, which is not allowed within a `loop {{ ... }}` context."
927 ));
928 }
929 }
930 }
931
932 for (_edge_id, (pred_id, node_id)) in self.flat_graph.edges() {
934 let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
935 continue;
936 };
937 let flo_type = &op_inst.op_constraints.flo_type;
938
939 let pred_loop_id = self.flat_graph.node_loop(pred_id);
940 let loop_id = self.flat_graph.node_loop(node_id);
941
942 let span = self.flat_graph.node(node_id).span();
943
944 let (is_input, is_output) = {
945 let parent_pred_loop_id =
946 pred_loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
947 let parent_loop_id = loop_id.and_then(|lid| self.flat_graph.loop_parent(lid));
948 let is_same = pred_loop_id == loop_id;
949 let is_input = !is_same && parent_loop_id == pred_loop_id;
950 let is_output = !is_same && parent_pred_loop_id == loop_id;
951 if !(is_input || is_output || is_same) {
952 self.diagnostics.push(Diagnostic::spanned(
953 span,
954 Level::Error,
955 "Operator input edge may not cross multiple loop contexts.",
956 ));
957 continue;
958 }
959 (is_input, is_output)
960 };
961
962 match flo_type {
963 None => {
964 if is_input {
965 self.diagnostics.push(Diagnostic::spanned(
966 span,
967 Level::Error,
968 format!(
969 "Operator `{}(...)` entering a loop context must be a windowing operator, but is not.",
970 op_inst.op_constraints.name
971 )
972 ));
973 }
974 if is_output {
975 self.diagnostics.push(Diagnostic::spanned(
976 span,
977 Level::Error,
978 format!(
979 "Operator `{}(...)` exiting a loop context must be an un-windowing operator, but is not.",
980 op_inst.op_constraints.name
981 )
982 ));
983 }
984 }
985 Some(FloType::Windowing) => {
986 if !is_input {
987 self.diagnostics.push(Diagnostic::spanned(
988 span,
989 Level::Error,
990 format!(
991 "Windowing operator `{}(...)` must be the first input operator into a `loop {{ ... }} context.",
992 op_inst.op_constraints.name
993 )
994 ));
995 }
996 }
997 Some(FloType::Unwindowing) => {
998 if !is_output {
999 self.diagnostics.push(Diagnostic::spanned(
1000 span,
1001 Level::Error,
1002 format!(
1003 "Un-windowing operator `{}(...)` must be the first output operator after exiting a `loop {{ ... }} context.",
1004 op_inst.op_constraints.name
1005 )
1006 ));
1007 }
1008 }
1009 Some(FloType::NextIteration) => {
1010 if loop_id.is_none() {
1012 self.diagnostics.push(Diagnostic::spanned(
1013 span,
1014 Level::Error,
1015 format!(
1016 "Operator `{}(...)` must be within a `loop {{ ... }}` context.",
1017 op_inst.op_constraints.name
1018 ),
1019 ));
1020 }
1021 }
1022 Some(FloType::Source) => {
1023 }
1025 }
1026 }
1027
1028 for (loop_id, loop_nodes) in self.flat_graph.loops() {
1032 let filter_next_iteration = |&node_id: &GraphNodeId| {
1034 self.flat_graph
1035 .node_op_inst(node_id)
1036 .map(|op_inst| Some(FloType::NextIteration) != op_inst.op_constraints.flo_type)
1037 .unwrap_or(true)
1038 };
1039
1040 let topo_sort_result = graph_algorithms::topo_sort(
1041 loop_nodes.iter().copied().filter(filter_next_iteration),
1042 |dst| {
1043 self.flat_graph
1044 .node_predecessor_nodes(dst)
1045 .filter(|&src| Some(loop_id) == self.flat_graph.node_loop(src))
1046 .filter(filter_next_iteration)
1047 },
1048 );
1049 if let Err(cycle) = topo_sort_result {
1050 let len = cycle.len();
1051 for (i, node_id) in cycle.into_iter().enumerate() {
1052 let span = self.flat_graph.node(node_id).span();
1053 self.diagnostics.push(Diagnostic::spanned(
1054 span,
1055 Level::Error,
1056 format!(
1057 "Operator forms an illegal cycle within a `loop {{ ... }}` block. Use `{}()` to pass data across loop iterations. ({}/{})",
1058 NEXT_ITERATION.name,
1059 i + 1,
1060 len,
1061 ),
1062 ));
1063 }
1064 }
1065 }
1066 }
1067}