Source code for capsul.pipeline.topological_sort

# -*- coding: utf-8 -*-
'''
Build graph with dependencies from a pipeline

Classes
=======
:class:`GraphNode`
------------------
'''

# System import
from __future__ import absolute_import
from __future__ import print_function
import logging
import six

# Define the logger
logger = logging.getLogger(__name__)


[docs] class GraphNode(object): """ Simple Graph Node Structure Attributes ---------- name : str the node name meta : object a python object stored in the node links_to : list object to store the graph edges: successor links_from : list object to store the graph edges: predecessor links_to_degree : int degree of the node regarding the successors links_from_degree : int degree of the node regarding the predecessors Methods -------- add_link_to remove_link_to add_link_from remove_link_from """ def __init__(self, name, meta): """ Create a Graph Node Parameters ---------- name: str (mandatory) the name of the node meta: object an python object to store in the node """ self.name = name self.meta = meta # variables to store the graph edges self.links_to = [] self.links_from = [] # the degree of the node self.links_to_degree = 0 self.links_from_degree = 0
[docs] class Graph(object): """ Simple Graph Structure on which we want to perform a topological tree (no cycle). The algorithm is based on the R.E. Tarjanlinear linear optimization (O(N+A)). Attributes ---------- _nodes : dict the graph nodes {node.name: node} _links : list graph edges (from_node, to_node) Methods -------- add_node find_node add_link topological_sort """ def __init__(self): """ Create a Graph """ self._nodes = {} self._links = []
[docs] def add_node(self, node): """ Method to add a GraphNode in the Graph Parameters ---------- node: GraphNode (mandatory) the node to insert """ logging.debug("node: {0}".format(node.name)) if not isinstance(node, GraphNode): raise Exception("Expect a GraphNode, got {0}".format(node)) if node.name in self._nodes: raise Exception("Expect a GraphNode with a unique name, " "got {0}".format(node)) self._nodes[node.name] = node
[docs] def find_node(self, node_name): """ Method to find a GraphNode in the Graph Parameters ---------- node_name: str (mandatory) the name of the desired node """ if node_name in self._nodes: return self._nodes[node_name] return None
[docs] def topological_sort(self): """ Perform the topological sort: find an order in which all the nodes can be taken. Step 1: Identify nodes that have no incoming link (nnil). Step 2: Loop until there are nnil a) Delete the current nodes c_nnil of in-degree 0. b) Place it in the output. c) Remove all its outgoing links from the graph. d) If the node has in-degree 0, add the node to nnil. Step 3: Assert that there is no loop in the graph. Returns ------- output: list of tuple a list of ordered nodes with a tuple element containing the node name and the node meta element. """ ordered_nodes = [] # Step 1 nnil = [] for name, node in six.iteritems(self._nodes): if node.links_from_degree == 0: nnil.append(node) # Step 2 while len(nnil): #-- a c_nnil = nnil.pop() #-- b ordered_nodes.append(c_nnil) #-- c for node in c_nnil.links_to: node.remove_link_from(c_nnil) #-- d if node.links_from_degree == 0: nnil.append(node) # Step 3 if len(ordered_nodes) == len(self._nodes): return [(node.name, node.meta) for node in ordered_nodes] else: raise Exception("There is loop in the Graph." "Please inverstigate")
if __name__ == '__main__': """ A toy example: slip -> chaussettes -> chemise -> veste -> pantalon -> ceinture -> chaussures -> cravate """ objects = ["chaussures", "chaussettes", "slip", "pantalon", "ceinture", "chemise", "veste", "cravate"] dependencies = [ ("slip", "pantalon"), ("chemise", "cravate"), ("chemise", "pantalon"), ("pantalon", "ceinture"), ("chaussettes", "chaussures"), ("pantalon", "chaussures"), ("ceinture", "chaussures"), ("chemise", "veste"), ] g = Graph() for o in objects: g.add_node(GraphNode(o, None)) for d in dependencies: g.add_link(d[0], d[1]) r = g.topological_sort() r = [x[0] for x in r] print(" -> ".join(r))