ads2_2022/code/python/src/algorithms/tarjan/algorithms.py

172 lines
5.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# IMPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from __future__ import annotations;
from src.thirdparty.types import *;
from src.core.log import *;
from src.models.stacks import *;
from src.models.graphs import *;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EXPORTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
__all__ = [
'tarjan_algorithm',
];
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# CONSTANTS
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class State(Enum):
UNTOUCHED = 0;
PENDING = 1;
FINISHED = 2;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Tarjan Algorithm
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def tarjan_algorithm(G: Graph, verbose: bool = False) -> List[Any]:
'''
# Tarjan Algorithm #
Runs the Tarjan-Algorithm to compute the strongly connected components.
'''
# initialise state - mark all nodes as UNTOUCHED:
ctx = Context(G, verbose=verbose);
# loop through all nodes and carry out Tarjan-Algorithm, provided node not already visitted.
for u in G.nodes:
if ctx.get_state(u) == State.UNTOUCHED:
tarjan_visit(G, u, ctx);
if verbose:
for component in ctx.components:
log_debug(component);
return ctx.components;
def tarjan_visit(G: Graph, u: Any, ctx: Context):
'''
Recursive depth-first search algorithm to compute strongly components of a graph.
'''
# Place node on stack + initialise visit-index + component-index.
ctx.max_index += 1;
ctx.push(u);
ctx.set_least_index(u, ctx.max_index);
ctx.set_index(u, ctx.max_index);
ctx.set_state(u, State.PENDING);
'''
Compute strongly connected components of each child node.
NOTE: Child nodes remain on stack, if and only if parent is in same component.
'''
for v in G.successors(u):
# Visit child node only if untouched:
if ctx.get_state(v) == State.UNTOUCHED:
tarjan_visit(G, v, ctx);
ctx.set_least_index(u, min(ctx.get_least_index(u), ctx.get_least_index(v)));
# Otherwise update associated component-index of parent node, if in same component as child:
elif ctx.stack_contains(v):
ctx.set_least_index(u, min(ctx.get_least_index(u), ctx.get_index(v)));
ctx.set_state(u, State.FINISHED);
ctx.log_info(u);
# If at least-index of component pop everything from stack up to least index and add component:
if ctx.get_index(u) == ctx.get_least_index(u):
component = [];
while True:
v = ctx.top();
ctx.pop();
component.append(v);
if u == v:
break;
ctx.components.append(component);
return;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# AUXILIARY context variables for algorithm
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@dataclass
class NodeInformationDefault:
node: Any = field(default=None);
least_index: int = field(default=0);
index: int = field(default=0);
state: State = field(default=State.UNTOUCHED, repr=False);
class NodeInformation(NodeInformationDefault):
def __init__(self, u: Any):
super().__init__();
self.node = u;
@dataclass
class ContextDefault:
max_index: int = field(default=0);
verbose: bool = field(default=False);
stack: Stack = field(default_factory=lambda: Stack());
components: list[list[Any]] = field(default_factory=lambda: []);
infos: dict[Any, NodeInformation] = field(default_factory=lambda: dict());
class Context(ContextDefault):
def __init__(self, G: Graph, verbose: bool):
super().__init__();
self.verbose = verbose;
self.infos = { u: NodeInformation(u) for u in G.nodes };
def push(self, u: Any):
self.stack.push(u);
def top(self) -> Any:
return self.stack.top();
def pop(self) -> Any:
return self.stack.pop();
def update_infos(self, u: Any, info: NodeInformation):
self.infos[u] = info;
def set_state(self, u: Any, state: State):
info = self.infos[u];
info.state = state;
self.update_infos(u, info);
def set_least_index(self, u: Any, least_index: int):
info = self.infos[u];
info.least_index = least_index;
self.update_infos(u, info);
def set_index(self, u: Any, index: int):
info = self.infos[u];
info.index = index;
self.update_infos(u, info);
def stack_contains(self, u: Any) -> bool:
return self.stack.contains(u);
def get_info(self, u: Any) -> NodeInformation:
return self.infos[u];
def get_state(self, u: Any) -> State:
return self.get_info(u).state;
def get_least_index(self, u: Any) -> int:
return self.get_info(u).least_index;
def get_index(self, u: Any) -> int:
return self.get_info(u).index;
def log_info(self, u: Any):
if not self.verbose:
return;
info = self.get_info(u);
log_debug(info);