Topological sort of a graph, allowing better console (and other) outputs.

Uses algorithm borrowed from networkx graph library to sort a graph in
topological order. The method is only used by output plugins, as
internal plumbery does not really care about the node order.

Also includes a bonobo.util.python.require function that helps importing
thing in a package-less context, or when there are conflict with site
package names.
This commit is contained in:
Romain Dorgueil
2017-05-19 13:28:31 +02:00
parent aa6da3aa2b
commit e747bc1da8
16 changed files with 214 additions and 64 deletions

View File

@ -85,3 +85,8 @@ class Configurable(metaclass=ConfigurableMeta):
# set option values.
for name, value in kwargs.items():
setattr(self, name, value)
def __call__(self, *args, **kwargs):
""" You can implement a configurable callable behaviour by implemenenting the call(...) method. Of course, it is also backward compatible with legacy __call__ override.
"""
return self.call(*args, **kwargs)

View File

@ -5,9 +5,19 @@ def require(package, requirement=None):
return __import__(package)
except ImportError:
from colorama import Fore, Style
print(Fore.YELLOW, 'This example requires the {!r} package. Install it using:'.format(requirement),
Style.RESET_ALL, sep='')
print(
Fore.YELLOW,
'This example requires the {!r} package. Install it using:'.
format(requirement),
Style.RESET_ALL,
sep=''
)
print()
print(Fore.YELLOW, ' $ pip install {!s}'.format(requirement), Style.RESET_ALL, sep='')
print(
Fore.YELLOW,
' $ pip install {!s}'.format(requirement),
Style.RESET_ALL,
sep=''
)
print()
raise
raise

View File

@ -87,7 +87,6 @@ class LoopingExecutionContext(Wrapper):
finally:
self._stopped = True
def handle_error(self, exc, trace):
return print_error(exc, trace, context=self.wrapped)

View File

@ -21,7 +21,7 @@ class GraphExecutionContext:
def __init__(self, graph, plugins=None, services=None):
self.graph = graph
self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes]
self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container()
@ -65,4 +65,4 @@ class GraphExecutionContext:
def stop(self):
# todo use strategy
for node in self.nodes:
node.stop()
node.stop()

View File

@ -73,19 +73,19 @@ class ConsoleOutputPlugin(Plugin):
def write(context, prefix='', rewind=True, append=None, debug=False, profile=False):
t_cnt = len(context)
for i, node in enumerate(context):
for i in context.graph.topologically_sorted_indexes:
node = context[i]
if node.alive:
_line = ''.join(
(
Fore.BLACK, '({})'.format(i + 1), Style.RESET_ALL, ' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ',
node.name, ' ', node.get_statistics_as_string(debug=debug,
profile=profile), Style.RESET_ALL, ' ',
' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ', node.name, '(', str(i), ') ',
node.get_statistics_as_string(debug=debug, profile=profile), Style.RESET_ALL, ' ',
)
)
else:
_line = ''.join(
(
Fore.BLACK, '({})'.format(i + 1), ' - ', node.name, ' ',
' ', Fore.BLACK, '-', ' ', node.name, '(', str(i), ') ',
node.get_statistics_as_string(debug=debug, profile=profile), Style.RESET_ALL, ' ',
)
)

View File

@ -55,7 +55,7 @@ class CsvReader(CsvHandler, FileReader):
for row in reader:
if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,))
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
yield dict(zip(headers.value, row))
@ -74,5 +74,3 @@ class CsvWriter(CsvHandler, FileWriter):
writer.writerow(row[header] for header in headers.value)
lineno.value += 1
return NOT_MODIFIED

View File

@ -43,7 +43,7 @@ class Bag:
def args(self):
if self._parent is None:
return self._args
return (*self._parent.args, *self._args,)
return (*self._parent.args, *self._args, )
@property
def kwargs(self):
@ -85,7 +85,7 @@ class Bag:
@classmethod
def inherit(cls, *args, **kwargs):
return cls(*args, _flags=(INHERIT_INPUT,), **kwargs)
return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs)
def __eq__(self, other):
return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs
@ -93,7 +93,7 @@ class Bag:
def __repr__(self):
return '<{} ({})>'.format(
type(self).__name__, ', '.
join(itertools.chain(
join(itertools.chain(
map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()),
))

View File

@ -3,35 +3,115 @@ from bonobo.constants import BEGIN
class Graph:
"""
Represents a coherent directed acyclic graph of components.
Represents a directed graph of nodes.
"""
def __init__(self, *chain):
self.edges = {BEGIN: set()}
self.named = {}
self.nodes = []
self.graph = {BEGIN: set()}
self.add_chain(*chain)
def __iter__(self):
yield from self.nodes
def __len__(self):
""" Node count.
"""
return len(self.nodes)
def __getitem__(self, key):
return self.nodes[key]
def outputs_of(self, idx, create=False):
if create and not idx in self.graph:
self.graph[idx] = set()
return self.graph[idx]
""" Get a set of the outputs for a given node index.
"""
if create and not idx in self.edges:
self.edges[idx] = set()
return self.edges[idx]
def add_node(self, c):
i = len(self.nodes)
""" Add a node without connections in this graph and returns its index.
"""
idx = len(self.nodes)
self.edges[idx] = set()
self.nodes.append(c)
return i
return idx
def add_chain(self, *nodes, _input=BEGIN, _output=None):
for node in nodes:
_next = self.add_node(node)
self.outputs_of(_input, create=True).add(_next)
_input = _next
if _output:
if not _output in self.nodes:
raise ValueError('Output not found.')
self.outputs_of(_input, create=True).add(self.nodes.index(_output))
def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None):
""" Add a chain in this graph.
"""
if len(nodes):
_input = self._resolve_index(_input)
_output = self._resolve_index(_output)
for i, node in enumerate(nodes):
_next = self.add_node(node)
if not i and _name:
if _name in self.named:
raise KeyError('Duplicate name {!r} in graph.'.format(_name))
self.named[_name] = _next
self.outputs_of(_input, create=True).add(_next)
_input = _next
if _output is not None:
self.outputs_of(_input, create=True).add(_output)
if hasattr(self, '_topologcally_sorted_indexes_cache'):
del self._topologcally_sorted_indexes_cache
return self
def __len__(self):
return len(self.nodes)
@property
def topologically_sorted_indexes(self):
"""Iterate in topological order, based on networkx's topological_sort() function.
"""
try:
return self._topologcally_sorted_indexes_cache
except AttributeError:
seen = set()
order = []
explored = set()
for i in self.edges:
if i in explored:
continue
fringe = [i]
while fringe:
w = fringe[-1] # depth first search
if w in explored: # already looked down this branch
fringe.pop()
continue
seen.add(w) # mark as seen
# Check successors for cycles and for new nodes
new_nodes = []
for n in self.outputs_of(w):
if n not in explored:
if n in seen: # CYCLE !!
raise RuntimeError("Graph contains a cycle.")
new_nodes.append(n)
if new_nodes: # Add new_nodes to fringe
fringe.extend(new_nodes)
else: # No new nodes so w is fully explored
explored.add(w)
order.append(w)
fringe.pop() # done considering this node
self._topologcally_sorted_indexes_cache = tuple(filter(lambda i: type(i) is int, reversed(order)))
return self._topologcally_sorted_indexes_cache
def _resolve_index(self, mixed):
""" Find the index based on various strategies for a node, probably an input or output of chain. Supported inputs are indexes, node values or names.
"""
if mixed is None:
return None
if type(mixed) is int or mixed in self.edges:
return mixed
if isinstance(mixed, str) and mixed in self.named:
return self.named[mixed]
if mixed in self.nodes:
return self.nodes.index(mixed)
raise ValueError('Cannot find node matching {!r}.'.format(mixed))

View File

@ -200,6 +200,9 @@ class ValueHolder:
def __invert__(self):
return ~self.value
def __len__(self):
return len(self.value)
def get_attribute_or_create(obj, attr, default):
try:
@ -207,4 +210,3 @@ def get_attribute_or_create(obj, attr, default):
except AttributeError:
setattr(obj, attr, default)
return getattr(obj, attr)

22
bonobo/util/python.py Normal file
View File

@ -0,0 +1,22 @@
import inspect
import os
import runpy
class _RequiredModule:
def __init__(self, dct):
self.__dict__ = dct
class _RequiredModulesRegistry(dict):
def require(self, name):
if name not in self:
bits = name.split('.')
pathname = os.path.join(os.getcwd(), os.path.dirname(inspect.getfile(inspect.stack()[1][0])))
filename = os.path.join(pathname, *bits[:-1], bits[-1] + '.py')
self[name] = _RequiredModule(runpy.run_path(filename, run_name=name))
return self[name]
registry = _RequiredModulesRegistry()
require = registry.require