diff --git a/.gitignore b/.gitignore index a88bd7b..d48b40b 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ /examples/private /htmlcov/ /sdist/ +/tags celerybeat-schedule parts/ pip-delete-this-directory.txt diff --git a/bonobo/config/configurables.py b/bonobo/config/configurables.py index c1486ee..64b4adc 100644 --- a/bonobo/config/configurables.py +++ b/bonobo/config/configurables.py @@ -85,3 +85,8 @@ class Configurable(metaclass=ConfigurableMeta): # set option values. for name, value in kwargs.items(): setattr(self, name, value) + + def __call__(self, *args, **kwargs): + """ You can implement a configurable callable behaviour by implemenenting the call(...) method. Of course, it is also backward compatible with legacy __call__ override. + """ + return self.call(*args, **kwargs) diff --git a/bonobo/examples/__init__.py b/bonobo/examples/__init__.py index cf3d84d..49b1544 100644 --- a/bonobo/examples/__init__.py +++ b/bonobo/examples/__init__.py @@ -5,9 +5,19 @@ def require(package, requirement=None): return __import__(package) except ImportError: from colorama import Fore, Style - print(Fore.YELLOW, 'This example requires the {!r} package. Install it using:'.format(requirement), - Style.RESET_ALL, sep='') + print( + Fore.YELLOW, + 'This example requires the {!r} package. Install it using:'. + format(requirement), + Style.RESET_ALL, + sep='' + ) print() - print(Fore.YELLOW, ' $ pip install {!s}'.format(requirement), Style.RESET_ALL, sep='') + print( + Fore.YELLOW, + ' $ pip install {!s}'.format(requirement), + Style.RESET_ALL, + sep='' + ) print() - raise \ No newline at end of file + raise diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index e1b9bb0..6ca22f2 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -87,7 +87,6 @@ class LoopingExecutionContext(Wrapper): finally: self._stopped = True - def handle_error(self, exc, trace): return print_error(exc, trace, context=self.wrapped) diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 2e55492..1f2671a 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -21,7 +21,7 @@ class GraphExecutionContext: def __init__(self, graph, plugins=None, services=None): self.graph = graph - self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] + self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] self.services = Container(services) if services else Container() @@ -65,4 +65,4 @@ class GraphExecutionContext: def stop(self): # todo use strategy for node in self.nodes: - node.stop() \ No newline at end of file + node.stop() diff --git a/bonobo/ext/console/plugin.py b/bonobo/ext/console/plugin.py index d76b3af..8884a73 100644 --- a/bonobo/ext/console/plugin.py +++ b/bonobo/ext/console/plugin.py @@ -73,19 +73,19 @@ class ConsoleOutputPlugin(Plugin): def write(context, prefix='', rewind=True, append=None, debug=False, profile=False): t_cnt = len(context) - for i, node in enumerate(context): + for i in context.graph.topologically_sorted_indexes: + node = context[i] if node.alive: _line = ''.join( ( - Fore.BLACK, '({})'.format(i + 1), Style.RESET_ALL, ' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ', - node.name, ' ', node.get_statistics_as_string(debug=debug, - profile=profile), Style.RESET_ALL, ' ', + ' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ', node.name, '(', str(i), ') ', + node.get_statistics_as_string(debug=debug, profile=profile), Style.RESET_ALL, ' ', ) ) else: _line = ''.join( ( - Fore.BLACK, '({})'.format(i + 1), ' - ', node.name, ' ', + ' ', Fore.BLACK, '-', ' ', node.name, '(', str(i), ') ', node.get_statistics_as_string(debug=debug, profile=profile), Style.RESET_ALL, ' ', ) ) diff --git a/bonobo/io/csv.py b/bonobo/io/csv.py index 30ad3a4..647925b 100644 --- a/bonobo/io/csv.py +++ b/bonobo/io/csv.py @@ -55,7 +55,7 @@ class CsvReader(CsvHandler, FileReader): for row in reader: if len(row) != field_count: - raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count,)) + raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, )) yield dict(zip(headers.value, row)) @@ -74,5 +74,3 @@ class CsvWriter(CsvHandler, FileWriter): writer.writerow(row[header] for header in headers.value) lineno.value += 1 return NOT_MODIFIED - - diff --git a/bonobo/structs/bags.py b/bonobo/structs/bags.py index e1fc442..3414d00 100644 --- a/bonobo/structs/bags.py +++ b/bonobo/structs/bags.py @@ -43,7 +43,7 @@ class Bag: def args(self): if self._parent is None: return self._args - return (*self._parent.args, *self._args,) + return (*self._parent.args, *self._args, ) @property def kwargs(self): @@ -85,7 +85,7 @@ class Bag: @classmethod def inherit(cls, *args, **kwargs): - return cls(*args, _flags=(INHERIT_INPUT,), **kwargs) + return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) def __eq__(self, other): return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs @@ -93,7 +93,7 @@ class Bag: def __repr__(self): return '<{} ({})>'.format( type(self).__name__, ', '. - join(itertools.chain( + join(itertools.chain( map(repr, self.args), ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), )) diff --git a/bonobo/structs/graphs.py b/bonobo/structs/graphs.py index 8fcc0e6..ccafb6b 100644 --- a/bonobo/structs/graphs.py +++ b/bonobo/structs/graphs.py @@ -3,35 +3,115 @@ from bonobo.constants import BEGIN class Graph: """ - Represents a coherent directed acyclic graph of components. + Represents a directed graph of nodes. """ def __init__(self, *chain): + self.edges = {BEGIN: set()} + self.named = {} self.nodes = [] - self.graph = {BEGIN: set()} self.add_chain(*chain) + def __iter__(self): + yield from self.nodes + + def __len__(self): + """ Node count. + """ + return len(self.nodes) + + def __getitem__(self, key): + return self.nodes[key] + def outputs_of(self, idx, create=False): - if create and not idx in self.graph: - self.graph[idx] = set() - return self.graph[idx] + """ Get a set of the outputs for a given node index. + """ + if create and not idx in self.edges: + self.edges[idx] = set() + return self.edges[idx] def add_node(self, c): - i = len(self.nodes) + """ Add a node without connections in this graph and returns its index. + """ + idx = len(self.nodes) + self.edges[idx] = set() self.nodes.append(c) - return i + return idx - def add_chain(self, *nodes, _input=BEGIN, _output=None): - for node in nodes: - _next = self.add_node(node) - self.outputs_of(_input, create=True).add(_next) - _input = _next - if _output: - if not _output in self.nodes: - raise ValueError('Output not found.') - self.outputs_of(_input, create=True).add(self.nodes.index(_output)) + def add_chain(self, *nodes, _input=BEGIN, _output=None, _name=None): + """ Add a chain in this graph. + """ + if len(nodes): + _input = self._resolve_index(_input) + _output = self._resolve_index(_output) + + for i, node in enumerate(nodes): + _next = self.add_node(node) + if not i and _name: + if _name in self.named: + raise KeyError('Duplicate name {!r} in graph.'.format(_name)) + self.named[_name] = _next + self.outputs_of(_input, create=True).add(_next) + _input = _next + + if _output is not None: + self.outputs_of(_input, create=True).add(_output) + + if hasattr(self, '_topologcally_sorted_indexes_cache'): + del self._topologcally_sorted_indexes_cache return self - def __len__(self): - return len(self.nodes) + @property + def topologically_sorted_indexes(self): + """Iterate in topological order, based on networkx's topological_sort() function. + """ + try: + return self._topologcally_sorted_indexes_cache + except AttributeError: + seen = set() + order = [] + explored = set() + + for i in self.edges: + if i in explored: + continue + fringe = [i] + while fringe: + w = fringe[-1] # depth first search + if w in explored: # already looked down this branch + fringe.pop() + continue + seen.add(w) # mark as seen + # Check successors for cycles and for new nodes + new_nodes = [] + for n in self.outputs_of(w): + if n not in explored: + if n in seen: # CYCLE !! + raise RuntimeError("Graph contains a cycle.") + new_nodes.append(n) + if new_nodes: # Add new_nodes to fringe + fringe.extend(new_nodes) + else: # No new nodes so w is fully explored + explored.add(w) + order.append(w) + fringe.pop() # done considering this node + self._topologcally_sorted_indexes_cache = tuple(filter(lambda i: type(i) is int, reversed(order))) + return self._topologcally_sorted_indexes_cache + + def _resolve_index(self, mixed): + """ Find the index based on various strategies for a node, probably an input or output of chain. Supported inputs are indexes, node values or names. + """ + if mixed is None: + return None + + if type(mixed) is int or mixed in self.edges: + return mixed + + if isinstance(mixed, str) and mixed in self.named: + return self.named[mixed] + + if mixed in self.nodes: + return self.nodes.index(mixed) + + raise ValueError('Cannot find node matching {!r}.'.format(mixed)) diff --git a/bonobo/util/objects.py b/bonobo/util/objects.py index 8b5db00..1e0015a 100644 --- a/bonobo/util/objects.py +++ b/bonobo/util/objects.py @@ -200,6 +200,9 @@ class ValueHolder: def __invert__(self): return ~self.value + def __len__(self): + return len(self.value) + def get_attribute_or_create(obj, attr, default): try: @@ -207,4 +210,3 @@ def get_attribute_or_create(obj, attr, default): except AttributeError: setattr(obj, attr, default) return getattr(obj, attr) - diff --git a/bonobo/util/python.py b/bonobo/util/python.py new file mode 100644 index 0000000..a496e19 --- /dev/null +++ b/bonobo/util/python.py @@ -0,0 +1,22 @@ +import inspect +import os +import runpy + + +class _RequiredModule: + def __init__(self, dct): + self.__dict__ = dct + + +class _RequiredModulesRegistry(dict): + def require(self, name): + if name not in self: + bits = name.split('.') + pathname = os.path.join(os.getcwd(), os.path.dirname(inspect.getfile(inspect.stack()[1][0]))) + filename = os.path.join(pathname, *bits[:-1], bits[-1] + '.py') + self[name] = _RequiredModule(runpy.run_path(filename, run_name=name)) + return self[name] + + +registry = _RequiredModulesRegistry() +require = registry.require diff --git a/setup.py b/setup.py index 4cd8d82..844240a 100644 --- a/setup.py +++ b/setup.py @@ -36,44 +36,41 @@ else: setup( name='bonobo', - description= - ('Bonobo, a simple, modern and atomic extract-transform-load toolkit for ' - 'python 3.5+.'), + description=('Bonobo, a simple, modern and atomic extract-transform-load toolkit for ' + 'python 3.5+.'), license='Apache License, Version 2.0', install_requires=[ - 'colorama >=0.3,<1.0', 'fs >=2.0,<3.0', 'psutil >=5.2,<6.0', - 'requests >=2.0,<3.0', 'stevedore >=1.21,<2.0' + 'colorama >=0.3,<1.0', 'fs >=2.0,<3.0', 'psutil >=5.2,<6.0', 'requests >=2.0,<3.0', 'stevedore >=1.21,<2.0' ], version=version, long_description=long_description, classifiers=classifiers, packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, - data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [ - 'bonobo/ext/jupyter/static/extension.js', - 'bonobo/ext/jupyter/static/index.js', - 'bonobo/ext/jupyter/static/index.js.map' - ])], + data_files=[ + ( + 'share/jupyter/nbextensions/bonobo-jupyter', [ + 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', + 'bonobo/ext/jupyter/static/index.js.map' + ] + ) + ], extras_require={ 'dev': [ - 'coverage >=4,<5', 'pylint >=1,<2', 'pytest >=3,<4', - 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'sphinx', + 'coverage >=4,<5', 'pylint >=1,<2', 'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'sphinx', 'sphinx_rtd_theme', 'yapf' ], 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] }, entry_points={ 'bonobo.commands': [ - 'init = bonobo.commands.init:register', - 'run = bonobo.commands.run:register', + 'init = bonobo.commands.init:register', 'run = bonobo.commands.run:register', 'version = bonobo.commands.version:register' ], 'console_scripts': ['bonobo = bonobo.commands:entrypoint'], - 'edgy.project.features': - ['bonobo = ' - 'bonobo.ext.edgy.project.feature:BonoboFeature'] + 'edgy.project.features': ['bonobo = ' + 'bonobo.ext.edgy.project.feature:BonoboFeature'] }, url='https://www.bonobo-project.org/', - download_url= - 'https://github.com/python-bonobo/bonobo/tarball/{version}'.format( - version=version), ) + download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), +) diff --git a/tests/structs/test_bags.py b/tests/structs/test_bags.py index 1d44026..fbb4f5b 100644 --- a/tests/structs/test_bags.py +++ b/tests/structs/test_bags.py @@ -5,7 +5,7 @@ from bonobo import Bag from bonobo.constants import INHERIT_INPUT from bonobo.structs import Token -args = ('foo', 'bar',) +args = ('foo', 'bar', ) kwargs = dict(acme='corp') @@ -34,29 +34,29 @@ def test_inherit(): bag3 = bag.extend('c', c=3) bag4 = Bag('d', d=4) - assert bag.args == ('a',) + assert bag.args == ('a', ) assert bag.kwargs == {'a': 1} assert bag.flags is () - assert bag2.args == ('a', 'b',) + assert bag2.args == ('a', 'b', ) assert bag2.kwargs == {'a': 1, 'b': 2} assert INHERIT_INPUT in bag2.flags - assert bag3.args == ('a', 'c',) + assert bag3.args == ('a', 'c', ) assert bag3.kwargs == {'a': 1, 'c': 3} assert bag3.flags is () - assert bag4.args == ('d',) + assert bag4.args == ('d', ) assert bag4.kwargs == {'d': 4} assert bag4.flags is () bag4.set_parent(bag) - assert bag4.args == ('a', 'd',) + assert bag4.args == ('a', 'd', ) assert bag4.kwargs == {'a': 1, 'd': 4} assert bag4.flags is () bag4.set_parent(bag3) - assert bag4.args == ('a', 'c', 'd',) + assert bag4.args == ('a', 'c', 'd', ) assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4} assert bag4.flags is () diff --git a/tests/structs/test_graphs.py b/tests/structs/test_graphs.py index c1c29c2..3afeaee 100644 --- a/tests/structs/test_graphs.py +++ b/tests/structs/test_graphs.py @@ -1,5 +1,7 @@ import pytest +from unittest.mock import sentinel + from bonobo.constants import BEGIN from bonobo.structs import Graph @@ -41,3 +43,30 @@ def test_graph_add_chain(): g.add_chain(identity, identity, identity) assert len(g.nodes) == 3 assert len(g.outputs_of(BEGIN)) == 1 + + +def test_graph_topological_sort(): + g = Graph() + + g.add_chain( + sentinel.a1, + sentinel.a2, + sentinel.a3, + _input=None, + _output=None, + ) + + assert g.topologically_sorted_indexes == (0, 1, 2) + assert g[0] == sentinel.a1 + assert g[1] == sentinel.a2 + assert g[2] == sentinel.a3 + + g.add_chain( + sentinel.b1, + sentinel.b2, + _output=sentinel.a2, + ) + + assert g.topologically_sorted_indexes == (0, 3, 4, 1, 2) + assert g[3] == sentinel.b1 + assert g[4] == sentinel.b2 diff --git a/tests/util/requireable/dummy.py b/tests/util/requireable/dummy.py new file mode 100644 index 0000000..1ce8ef1 --- /dev/null +++ b/tests/util/requireable/dummy.py @@ -0,0 +1 @@ +foo = 'bar' diff --git a/tests/util/test_python.py b/tests/util/test_python.py new file mode 100644 index 0000000..6b1b591 --- /dev/null +++ b/tests/util/test_python.py @@ -0,0 +1,6 @@ +from bonobo.util.python import require + + +def test_require(): + dummy = require('requireable.dummy') + assert dummy.foo == 'bar'