1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
//! Subgraph partioning algorithm
use std::collections::{BTreeMap, BTreeSet};
use proc_macro2::Span;
use slotmap::{SecondaryMap, SparseSecondaryMap};
use syn::parse_quote;
use super::hydroflow_graph::DfirGraph;
use super::ops::{find_node_op_constraints, DelayType};
use super::{graph_algorithms, Color, GraphEdgeId, GraphNode, GraphNodeId, GraphSubgraphId};
use crate::diagnostic::{Diagnostic, Level};
use crate::union_find::UnionFind;
/// Helper struct for tracking barrier crossers, see [`find_barrier_crossers`].
struct BarrierCrossers {
/// Edge barrier crossers, including what type.
pub edge_barrier_crossers: SecondaryMap<GraphEdgeId, DelayType>,
/// Singleton reference barrier crossers, considered to be [`DelayType::Stratum`].
pub singleton_barrier_crossers: Vec<(GraphNodeId, GraphNodeId)>,
}
impl BarrierCrossers {
/// Iterate pairs of nodes that are across a barrier.
fn iter_node_pairs<'a>(
&'a self,
partitioned_graph: &'a DfirGraph,
) -> impl 'a + Iterator<Item = ((GraphNodeId, GraphNodeId), DelayType)> {
let edge_pairs_iter = self
.edge_barrier_crossers
.iter()
.map(|(edge_id, &delay_type)| {
let src_dst = partitioned_graph.edge(edge_id);
(src_dst, delay_type)
});
let singleton_pairs_iter = self
.singleton_barrier_crossers
.iter()
.map(|&src_dst| (src_dst, DelayType::Stratum));
edge_pairs_iter.chain(singleton_pairs_iter)
}
/// Insert/replace edge.
fn replace_edge(&mut self, old_edge_id: GraphEdgeId, new_edge_id: GraphEdgeId) {
if let Some(delay_type) = self.edge_barrier_crossers.remove(old_edge_id) {
self.edge_barrier_crossers.insert(new_edge_id, delay_type);
}
}
}
/// Find all the barrier crossers.
fn find_barrier_crossers(partitioned_graph: &DfirGraph) -> BarrierCrossers {
let edge_barrier_crossers = partitioned_graph
.edges()
.filter_map(|(edge_id, (_src, dst))| {
let (_src_port, dst_port) = partitioned_graph.edge_ports(edge_id);
let op_constraints = partitioned_graph.node_op_inst(dst)?.op_constraints;
let input_barrier = (op_constraints.input_delaytype_fn)(dst_port)?;
Some((edge_id, input_barrier))
})
.collect();
let singleton_barrier_crossers = partitioned_graph
.node_ids()
.flat_map(|dst| {
partitioned_graph
.node_singleton_references(dst)
.iter()
.flatten()
.map(move |&src_ref| (src_ref, dst))
})
.collect();
BarrierCrossers {
edge_barrier_crossers,
singleton_barrier_crossers,
}
}
fn find_subgraph_unionfind(
partitioned_graph: &DfirGraph,
barrier_crossers: &BarrierCrossers,
) -> (UnionFind<GraphNodeId>, BTreeSet<GraphEdgeId>) {
// Modality (color) of nodes, push or pull.
// TODO(mingwei)? This does NOT consider `DelayType` barriers (which generally imply `Pull`),
// which makes it inconsistant with the final output in `as_code()`. But this doesn't create
// any bugs since we exclude `DelayType` edges from joining subgraphs anyway.
let mut node_color = partitioned_graph
.node_ids()
.filter_map(|node_id| {
let op_color = partitioned_graph.node_color(node_id)?;
Some((node_id, op_color))
})
.collect::<SparseSecondaryMap<_, _>>();
let mut subgraph_unionfind: UnionFind<GraphNodeId> =
UnionFind::with_capacity(partitioned_graph.nodes().len());
// Will contain all edges which are handoffs. Starts out with all edges and
// we remove from this set as we combine nodes into subgraphs.
let mut handoff_edges: BTreeSet<GraphEdgeId> = partitioned_graph.edge_ids().collect();
// Would sort edges here for priority (for now, no sort/priority).
// Each edge gets looked at in order. However we may not know if a linear
// chain of operators is PUSH vs PULL until we look at the ends. A fancier
// algorithm would know to handle linear chains from the outside inward.
// But instead we just run through the edges in a loop until no more
// progress is made. Could have some sort of O(N^2) pathological worst
// case.
let mut progress = true;
while progress {
progress = false;
// TODO(mingwei): Could this iterate `handoff_edges` instead? (Modulo ownership). Then no case (1) below.
for (edge_id, (src, dst)) in partitioned_graph.edges().collect::<Vec<_>>() {
// Ignore (1) already added edges as well as (2) new self-cycles. (Unless reference edge).
if subgraph_unionfind.same_set(src, dst) {
// Note that the _edge_ `edge_id` might not be in the subgraph even when both `src` and `dst` are. This prevents case 2.
// Handoffs will be inserted later for this self-loop.
continue;
}
// Do not connect stratum crossers (next edges).
if barrier_crossers
.iter_node_pairs(partitioned_graph)
.any(|((x_src, x_dst), _)| {
(subgraph_unionfind.same_set(x_src, src)
&& subgraph_unionfind.same_set(x_dst, dst))
|| (subgraph_unionfind.same_set(x_src, dst)
&& subgraph_unionfind.same_set(x_dst, src))
})
{
continue;
}
// Do not connect across loop contexts.
if partitioned_graph.node_loop(src) != partitioned_graph.node_loop(dst) {
continue;
}
if can_connect_colorize(&mut node_color, src, dst) {
// At this point we have selected this edge and its src & dst to be
// within a single subgraph.
subgraph_unionfind.union(src, dst);
assert!(handoff_edges.remove(&edge_id));
progress = true;
}
}
}
(subgraph_unionfind, handoff_edges)
}
/// Builds the datastructures for checking which subgraph each node belongs to
/// after handoffs have already been inserted to partition subgraphs.
/// This list of nodes in each subgraph are returned in topological sort order.
fn make_subgraph_collect(
partitioned_graph: &DfirGraph,
mut subgraph_unionfind: UnionFind<GraphNodeId>,
) -> SecondaryMap<GraphNodeId, Vec<GraphNodeId>> {
// We want the nodes of each subgraph to be listed in topo-sort order.
// We could do this on each subgraph, or we could do it all at once on the
// whole node graph by ignoring handoffs, which is what we do here:
let topo_sort = graph_algorithms::topo_sort(
partitioned_graph
.nodes()
.filter(|&(_, node)| !matches!(node, GraphNode::Handoff { .. }))
.map(|(node_id, _)| node_id),
|v| {
partitioned_graph
.node_predecessor_nodes(v)
.filter(|&pred_id| {
let pred = partitioned_graph.node(pred_id);
!matches!(pred, GraphNode::Handoff { .. })
})
},
)
.expect("Subgraphs are in-out trees.");
let mut grouped_nodes: SecondaryMap<GraphNodeId, Vec<GraphNodeId>> = Default::default();
for node_id in topo_sort {
let repr_node = subgraph_unionfind.find(node_id);
if !grouped_nodes.contains_key(repr_node) {
grouped_nodes.insert(repr_node, Default::default());
}
grouped_nodes[repr_node].push(node_id);
}
grouped_nodes
}
/// Find subgraph and insert handoffs.
/// Modifies barrier_crossers so that the edge OUT of an inserted handoff has
/// the DelayType data.
fn make_subgraphs(partitioned_graph: &mut DfirGraph, barrier_crossers: &mut BarrierCrossers) {
// Algorithm:
// 1. Each node begins as its own subgraph.
// 2. Collect edges. (Future optimization: sort so edges which should not be split across a handoff come first).
// 3. For each edge, try to join `(to, from)` into the same subgraph.
// TODO(mingwei):
// self.partitioned_graph.assert_valid();
let (subgraph_unionfind, handoff_edges) =
find_subgraph_unionfind(partitioned_graph, barrier_crossers);
// Insert handoffs between subgraphs (or on subgraph self-loop edges)
for edge_id in handoff_edges {
let (src_id, dst_id) = partitioned_graph.edge(edge_id);
// Already has a handoff, no need to insert one.
let src_node = partitioned_graph.node(src_id);
let dst_node = partitioned_graph.node(dst_id);
if matches!(src_node, GraphNode::Handoff { .. })
|| matches!(dst_node, GraphNode::Handoff { .. })
{
continue;
}
let hoff = GraphNode::Handoff {
src_span: src_node.span(),
dst_span: dst_node.span(),
};
let (_node_id, out_edge_id) = partitioned_graph.insert_intermediate_node(edge_id, hoff);
// Update barrier_crossers for inserted node.
barrier_crossers.replace_edge(edge_id, out_edge_id);
}
// Determine node's subgraph and subgraph's nodes.
// This list of nodes in each subgraph are to be in topological sort order.
// Eventually returned directly in the `HydroflowGraph`.
let grouped_nodes = make_subgraph_collect(partitioned_graph, subgraph_unionfind);
for (_repr_node, member_nodes) in grouped_nodes {
partitioned_graph.insert_subgraph(member_nodes).unwrap();
}
}
/// Set `src` or `dst` color if `None` based on the other (if possible):
/// `None` indicates an op could be pull or push i.e. unary-in & unary-out.
/// So in that case we color `src` or `dst` based on its newfound neighbor (the other one).
///
/// Returns if `src` and `dst` can be in the same subgraph.
fn can_connect_colorize(
node_color: &mut SparseSecondaryMap<GraphNodeId, Color>,
src: GraphNodeId,
dst: GraphNodeId,
) -> bool {
// Pull -> Pull
// Push -> Push
// Pull -> [Computation] -> Push
// Push -> [Handoff] -> Pull
let can_connect = match (node_color.get(src), node_color.get(dst)) {
// Linear chain, can't connect because it may cause future conflicts.
// But if it doesn't in the _future_ we can connect it (once either/both ends are determined).
(None, None) => false,
// Infer left side.
(None, Some(Color::Pull | Color::Comp)) => {
node_color.insert(src, Color::Pull);
true
}
(None, Some(Color::Push | Color::Hoff)) => {
node_color.insert(src, Color::Push);
true
}
// Infer right side.
(Some(Color::Pull | Color::Hoff), None) => {
node_color.insert(dst, Color::Pull);
true
}
(Some(Color::Comp | Color::Push), None) => {
node_color.insert(dst, Color::Push);
true
}
// Both sides already specified.
(Some(Color::Pull), Some(Color::Pull)) => true,
(Some(Color::Pull), Some(Color::Comp)) => true,
(Some(Color::Pull), Some(Color::Push)) => true,
(Some(Color::Comp), Some(Color::Pull)) => false,
(Some(Color::Comp), Some(Color::Comp)) => false,
(Some(Color::Comp), Some(Color::Push)) => true,
(Some(Color::Push), Some(Color::Pull)) => false,
(Some(Color::Push), Some(Color::Comp)) => false,
(Some(Color::Push), Some(Color::Push)) => true,
// Handoffs are not part of subgraphs.
(Some(Color::Hoff), Some(_)) => false,
(Some(_), Some(Color::Hoff)) => false,
};
can_connect
}
/// Stratification is surprisingly tricky. Basically it is topological sort, but with some nuance.
///
/// Returns an error if there is a cycle thru negation.
fn find_subgraph_strata(
partitioned_graph: &mut DfirGraph,
barrier_crossers: &BarrierCrossers,
) -> Result<(), Diagnostic> {
// Determine subgraphs's stratum number.
// Find SCCs ignoring `defer_tick()` (`DelayType::Tick`) edges, then do TopoSort on the
// resulting DAG.
// Cycles thru cross-stratum negative edges (both `DelayType::Tick` and `DelayType::Stratum`)
// are an error.
// Generate a subgraph graph. I.e. each node is a subgraph.
// Edges are connections between subgraphs, ignoring tick-crossers.
// TODO: use DiMulGraph here?
#[derive(Default)]
struct SubgraphGraph {
preds: BTreeMap<GraphSubgraphId, Vec<GraphSubgraphId>>,
succs: BTreeMap<GraphSubgraphId, Vec<GraphSubgraphId>>,
}
impl SubgraphGraph {
fn insert_edge(&mut self, src: GraphSubgraphId, dst: GraphSubgraphId) {
self.preds.entry(dst).or_default().push(src);
self.succs.entry(src).or_default().push(dst);
}
}
let mut subgraph_graph = SubgraphGraph::default();
// Negative (next stratum) connections between subgraphs. (Ignore `defer_tick()` connections).
let mut subgraph_stratum_barriers: BTreeSet<(GraphSubgraphId, GraphSubgraphId)> =
Default::default();
// Iterate handoffs between subgraphs, to build a subgraph meta-graph.
for (node_id, node) in partitioned_graph.nodes() {
if matches!(node, GraphNode::Handoff { .. }) {
assert_eq!(1, partitioned_graph.node_successors(node_id).count());
let (succ_edge, succ) = partitioned_graph.node_successors(node_id).next().unwrap();
// TODO(mingwei): Should we look at the singleton references too?
let succ_edge_delaytype = barrier_crossers
.edge_barrier_crossers
.get(succ_edge)
.copied();
// Ignore tick edges.
if let Some(DelayType::Tick | DelayType::TickLazy) = succ_edge_delaytype {
continue;
}
assert_eq!(1, partitioned_graph.node_predecessors(node_id).count());
let (_edge_id, pred) = partitioned_graph.node_predecessors(node_id).next().unwrap();
let pred_sg = partitioned_graph.node_subgraph(pred).unwrap();
let succ_sg = partitioned_graph.node_subgraph(succ).unwrap();
subgraph_graph.insert_edge(pred_sg, succ_sg);
if Some(DelayType::Stratum) == succ_edge_delaytype {
subgraph_stratum_barriers.insert((pred_sg, succ_sg));
}
}
}
// Include reference edges as well.
// TODO(mingwei): deduplicate graph building code.
for &(pred, succ) in barrier_crossers.singleton_barrier_crossers.iter() {
assert_ne!(pred, succ, "TODO(mingwei)");
let pred_sg = partitioned_graph.node_subgraph(pred).unwrap();
let succ_sg = partitioned_graph.node_subgraph(succ).unwrap();
assert_ne!(pred_sg, succ_sg);
subgraph_graph.insert_edge(pred_sg, succ_sg);
subgraph_stratum_barriers.insert((pred_sg, succ_sg));
}
// Topological sort (of strongly connected components) is how we find the (nondecreasing)
// order of strata.
let topo_sort_order = graph_algorithms::topo_sort_scc(
|| partitioned_graph.subgraph_ids(),
|v| subgraph_graph.preds.get(&v).into_iter().flatten().cloned(),
|u| subgraph_graph.succs.get(&u).into_iter().flatten().cloned(),
);
// Each subgraph's stratum number is the same as it's predecessors. Unless there is a negative
// edge, then we increment.
for sg_id in topo_sort_order {
let stratum = subgraph_graph
.preds
.get(&sg_id)
.into_iter()
.flatten()
.filter_map(|&pred_sg_id| {
partitioned_graph
.subgraph_stratum(pred_sg_id)
.map(|stratum| {
stratum
+ (subgraph_stratum_barriers.contains(&(pred_sg_id, sg_id)) as usize)
})
})
.max()
.unwrap_or(0);
partitioned_graph.set_subgraph_stratum(sg_id, stratum);
}
// Re-introduce the `defer_tick()` edges, ensuring they actually go to the next tick.
let extra_stratum = partitioned_graph.max_stratum().unwrap_or(0) + 1; // Used for `defer_tick()` delayer subgraphs.
for (edge_id, &delay_type) in barrier_crossers.edge_barrier_crossers.iter() {
let (hoff, dst) = partitioned_graph.edge(edge_id);
let (_hoff_port, dst_port) = partitioned_graph.edge_ports(edge_id);
assert_eq!(1, partitioned_graph.node_predecessors(hoff).count());
let src = partitioned_graph
.node_predecessor_nodes(hoff)
.next()
.unwrap();
let src_sg = partitioned_graph.node_subgraph(src).unwrap();
let dst_sg = partitioned_graph.node_subgraph(dst).unwrap();
let src_stratum = partitioned_graph.subgraph_stratum(src_sg);
let dst_stratum = partitioned_graph.subgraph_stratum(dst_sg);
match delay_type {
DelayType::Tick | DelayType::TickLazy => {
let is_lazy = matches!(delay_type, DelayType::TickLazy);
// If tick edge goes foreward in stratum, need to buffer.
// (TODO(mingwei): could use a different kind of handoff.)
// Or if lazy, need to create extra subgraph to mark as lazy.
if src_stratum <= dst_stratum || is_lazy {
// We inject a new subgraph between the src/dst which runs as the last stratum
// of the tick and therefore delays the data until the next tick.
// Before: A (src) -> H -> B (dst)
// Then add intermediate identity:
let (new_node_id, new_edge_id) = partitioned_graph.insert_intermediate_node(
edge_id,
// TODO(mingwei): Proper span w/ `parse_quote_spanned!`?
GraphNode::Operator(parse_quote! { identity() }),
);
// Intermediate: A (src) -> H -> ID -> B (dst)
let hoff = GraphNode::Handoff {
src_span: Span::call_site(), // TODO(mingwei): Proper spanning?
dst_span: Span::call_site(),
};
let (_hoff_node_id, _hoff_edge_id) =
partitioned_graph.insert_intermediate_node(new_edge_id, hoff);
// After: A (src) -> H -> ID -> H' -> B (dst)
// Set stratum number for new intermediate:
// Create subgraph.
let new_subgraph_id = partitioned_graph
.insert_subgraph(vec![new_node_id])
.unwrap();
// Assign stratum.
partitioned_graph.set_subgraph_stratum(new_subgraph_id, extra_stratum);
// Assign laziness.
partitioned_graph.set_subgraph_laziness(new_subgraph_id, is_lazy);
}
}
DelayType::Stratum => {
// Any negative edges which go onto the same or previous stratum are bad.
// Indicates an unbroken negative cycle.
// TODO(mingwei): This check is insufficient: https://github.com/hydro-project/hydro/issues/1115#issuecomment-2018385033
if dst_stratum <= src_stratum {
return Err(Diagnostic::spanned(dst_port.span(), Level::Error, "Negative edge creates a negative cycle which must be broken with a `defer_tick()` operator."));
}
}
DelayType::MonotoneAccum => {
// cycles are actually fine
continue;
}
}
}
Ok(())
}
/// Put `is_external_input: true` operators in separate stratum 0 subgraphs if they are not in stratum 0.
/// By ripping them out of their subgraph/stratum if they're not already in statum 0.
fn separate_external_inputs(partitioned_graph: &mut DfirGraph) {
let external_input_nodes: Vec<_> = partitioned_graph
.nodes()
// Ensure node is an operator (not a handoff), get constraints spec.
.filter_map(|(node_id, node)| {
find_node_op_constraints(node).map(|op_constraints| (node_id, op_constraints))
})
// Ensure current `node_id` is an external input.
.filter(|(_node_id, op_constraints)| op_constraints.is_external_input)
// Collect just `node_id`s.
.map(|(node_id, _op_constraints)| node_id)
// Ignore if operator node is already stratum 0.
.filter(|&node_id| {
0 != partitioned_graph
.subgraph_stratum(partitioned_graph.node_subgraph(node_id).unwrap())
.unwrap()
})
.collect();
for node_id in external_input_nodes {
// Remove node from old subgraph.
assert!(
partitioned_graph.remove_from_subgraph(node_id),
"Cannot move input node that is not in a subgraph, this is a Hydroflow bug."
);
// Create new subgraph in stratum 0 for this source.
let new_sg_id = partitioned_graph.insert_subgraph(vec![node_id]).unwrap();
partitioned_graph.set_subgraph_stratum(new_sg_id, 0);
// Insert handoff.
for edge_id in partitioned_graph
.node_successor_edges(node_id)
.collect::<Vec<_>>()
{
let span = partitioned_graph.node(node_id).span();
let hoff = GraphNode::Handoff {
src_span: span,
dst_span: span,
};
partitioned_graph.insert_intermediate_node(edge_id, hoff);
}
}
}
/// Main method for this module. Partions a flat [`DfirGraph`] into one with subgraphs.
///
/// Returns an error if a negative cycle exists in the graph. Negative cycles prevent partioning.
pub fn partition_graph(flat_graph: DfirGraph) -> Result<DfirGraph, Diagnostic> {
// Pre-find barrier crossers (input edges with a `DelayType`).
let mut barrier_crossers = find_barrier_crossers(&flat_graph);
let mut partitioned_graph = flat_graph;
// Partition into subgraphs.
make_subgraphs(&mut partitioned_graph, &mut barrier_crossers);
// Find strata for subgraphs (early returns with error if negative cycle found).
find_subgraph_strata(&mut partitioned_graph, &barrier_crossers)?;
// Ensure all external inputs are in stratum 0.
separate_external_inputs(&mut partitioned_graph);
Ok(partitioned_graph)
}