From 171fa3415be0ecdc50a4b6b5a2189089994574d9 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 8 May 2017 11:33:02 +0200 Subject: [PATCH] first draft of enhancers. --- bonobo/core/statistics.py | 3 ++- bonobo/examples/__init__.py | 13 ++++++++++ bonobo/execution/base.py | 52 +++++++++++++++++++++++-------------- bonobo/execution/graph.py | 4 +-- bonobo/execution/node.py | 47 ++++++++++++++++++--------------- bonobo/execution/plugin.py | 2 +- bonobo/plugins.py | 17 ++++++++++++ bonobo/structs/bags.py | 9 ++++--- bonobo/util/helpers.py | 5 ++++ bonobo/util/objects.py | 9 +++++++ docs/guide/plugins.rst | 17 ++++++++++++ docs/roadmap.rst | 30 +++++++++++++++++++-- tests/io/test_csv.py | 4 +-- tests/io/test_file.py | 4 +-- tests/io/test_json.py | 4 +-- tests/structs/test_bags.py | 39 +++++++++++++++++++++++----- 16 files changed, 197 insertions(+), 62 deletions(-) create mode 100644 docs/guide/plugins.rst diff --git a/bonobo/core/statistics.py b/bonobo/core/statistics.py index 12d040a..5d71a0f 100644 --- a/bonobo/core/statistics.py +++ b/bonobo/core/statistics.py @@ -24,7 +24,8 @@ class WithStatistics: return ((name, self.statistics[name]) for name in self.statistics_names) def get_statistics_as_string(self, *args, **kwargs): - return ' '.join(('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)) + stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0) + return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else '' def increment(self, name): self.statistics[name] += 1 diff --git a/bonobo/examples/__init__.py b/bonobo/examples/__init__.py index e69de29..cf3d84d 100644 --- a/bonobo/examples/__init__.py +++ b/bonobo/examples/__init__.py @@ -0,0 +1,13 @@ +def require(package, requirement=None): + requirement = requirement or package + + try: + return __import__(package) + except ImportError: + from colorama import Fore, Style + print(Fore.YELLOW, 'This example requires the {!r} package. Install it using:'.format(requirement), + Style.RESET_ALL, sep='') + print() + print(Fore.YELLOW, ' $ pip install {!s}'.format(requirement), Style.RESET_ALL, sep='') + print() + raise \ No newline at end of file diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index 3a85145..85666ac 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -1,21 +1,27 @@ import traceback +from contextlib import contextmanager from time import sleep from bonobo.config import Container -from bonobo.config.processors import resolve_processors, ContextCurrifier +from bonobo.config.processors import ContextCurrifier +from bonobo.plugins import get_enhancers from bonobo.util.errors import print_error -from bonobo.util.iterators import ensure_tuple -from bonobo.util.objects import Wrapper +from bonobo.util.objects import Wrapper, get_name + + +@contextmanager +def unrecoverable(error_handler): + try: + yield + except Exception as exc: # pylint: disable=broad-except + error_handler(exc, traceback.format_exc()) + raise # raise unrecoverableerror from x ? class LoopingExecutionContext(Wrapper): alive = True PERIOD = 0.25 - @property - def state(self): - return self._started, self._stopped - @property def started(self): return self._started @@ -26,7 +32,9 @@ class LoopingExecutionContext(Wrapper): def __init__(self, wrapped, parent, services=None): super().__init__(wrapped) + self.parent = parent + if services: if parent: raise RuntimeError( @@ -36,19 +44,25 @@ class LoopingExecutionContext(Wrapper): else: self.services = None - self._started, self._stopped, self._stack = False, False, None + self._started, self._stopped = False, False + self._stack = None + + # XXX enhancers + self._enhancers = get_enhancers(self.wrapped) def start(self): - assert self.state == (False, - False), ('{}.start() can only be called on a new node.').format(type(self).__name__) + if self.started: + raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self))) + self._started = True self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context()) - try: + with unrecoverable(self.handle_error): self._stack.setup(self) - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise + + for enhancer in self._enhancers: + with unrecoverable(self.handle_error): + enhancer.start(self) def loop(self): """Generic loop. A bit boring. """ @@ -61,16 +75,16 @@ class LoopingExecutionContext(Wrapper): raise NotImplementedError('Abstract.') def stop(self): - assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) + if not self.started: + raise RuntimeError('Cannot stop an unstarted node ({}).'.format(get_name(self))) + if self._stopped: return self._stopped = True - try: + + with unrecoverable(self.handle_error): self._stack.teardown() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise def handle_error(self, exc, trace): return print_error(exc, trace, context=self.wrapped) diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 6f95ac3..2e55492 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -50,7 +50,7 @@ class GraphExecutionContext: for i in self.graph.outputs_of(BEGIN): for message in messages: - self[i].recv(message) + self[i].write(message) def start(self): # todo use strategy @@ -65,4 +65,4 @@ class GraphExecutionContext: def stop(self): # todo use strategy for node in self.nodes: - node.stop() + node.stop() \ No newline at end of file diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index fc665af..5969edd 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -8,8 +8,10 @@ from bonobo.core.statistics import WithStatistics from bonobo.errors import InactiveReadableError from bonobo.execution.base import LoopingExecutionContext from bonobo.structs.bags import Bag +from bonobo.util.compat import deprecated_alias from bonobo.util.errors import is_error from bonobo.util.iterators import iter_if_not_sequence +from bonobo.util.objects import get_name class NodeExecutionContext(WithStatistics, LoopingExecutionContext): @@ -22,6 +24,10 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """todo check if this is right, and where it is used""" return self.input.alive and self._started and not self._stopped + @property + def alive_str(self): + return '+' if self.alive else '-' + def __init__(self, wrapped, parent=None, services=None): LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) WithStatistics.__init__(self, 'in', 'out', 'err') @@ -30,18 +36,13 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): self.outputs = [] def __str__(self): - return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() + return self.alive_str + ' ' + self.__name__ + self.get_statistics_as_string(prefix=' ') def __repr__(self): - stats = self.get_statistics_as_string().strip() - return '<{}({}{}){}>'.format( - type(self).__name__, - '+' if self.alive else '', - self.__name__, - (' ' + stats) if stats else '', - ) + name, type_name = get_name(self), get_name(type(self)) + return '<{}({}{}){}>'.format(type_name, self.alive_str, name, self.get_statistics_as_string(prefix=' ')) - def recv(self, *messages): + def write(self, *messages): # XXX write() ? ( node.write(...) ) """ Push a message list to this context's input queue. @@ -50,19 +51,29 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): for message in messages: self.input.put(message) - def send(self, value, _control=False): + # XXX deprecated alias + recv = deprecated_alias('recv', write) + + def send(self, value, _control=False): # XXX self.send(....) """ Sends a message to all of this context's outputs. :param mixed value: message :param _control: if true, won't count in statistics. """ + if not _control: self.increment('out') - for output in self.outputs: - output.put(value) - def get(self): + if is_error(value): + value.apply(self.handle_error) + else: + for output in self.outputs: + output.put(value) + + push = deprecated_alias('push', send) + + def get(self): # recv() ? input_data = self.receive() """ Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. @@ -95,12 +106,6 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # todo add timer self.handle_results(input_bag, input_bag.apply(self._stack)) - def push(self, bag): - # MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!! - # xxx handle error or send in first call to apply(...)? - # xxx return value ? - bag.apply(self.handle_error) if is_error(bag) else self.send(bag) - def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -108,7 +113,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): results = iter_if_not_sequence(results) except TypeError: # not an iterator if results: - self.push(_resolve(input_bag, results)) + self.send(_resolve(input_bag, results)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -120,7 +125,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): except StopIteration: break else: - self.push(_resolve(input_bag, result)) + self.send(_resolve(input_bag, result)) def _resolve(input_bag, output): diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py index ec1c5bd..db5c0db 100644 --- a/bonobo/execution/plugin.py +++ b/bonobo/execution/plugin.py @@ -31,4 +31,4 @@ class PluginExecutionContext(LoopingExecutionContext): try: self.wrapped.run() except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) \ No newline at end of file + self.handle_error(exc, traceback.format_exc()) diff --git a/bonobo/plugins.py b/bonobo/plugins.py index a719500..4fa1e18 100644 --- a/bonobo/plugins.py +++ b/bonobo/plugins.py @@ -1,3 +1,7 @@ +from bonobo.config import Configurable +from bonobo.util.objects import get_attribute_or_create + + class Plugin: """ A plugin is an extension to the core behavior of bonobo. If you're writing transformations, you should not need @@ -21,3 +25,16 @@ class Plugin: def finalize(self): pass + + +def get_enhancers(obj): + try: + return get_attribute_or_create(obj, '__enhancers__', list()) + except AttributeError: + return list() + + +class NodeEnhancer(Configurable): + def __matmul__(self, other): + get_enhancers(other).append(self) + return other diff --git a/bonobo/structs/bags.py b/bonobo/structs/bags.py index 1d4a7e8..e1fc442 100644 --- a/bonobo/structs/bags.py +++ b/bonobo/structs/bags.py @@ -43,7 +43,7 @@ class Bag: def args(self): if self._parent is None: return self._args - return (*self._parent.args, *self._args, ) + return (*self._parent.args, *self._args,) @property def kwargs(self): @@ -85,12 +85,15 @@ class Bag: @classmethod def inherit(cls, *args, **kwargs): - return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) + return cls(*args, _flags=(INHERIT_INPUT,), **kwargs) + + def __eq__(self, other): + return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs def __repr__(self): return '<{} ({})>'.format( type(self).__name__, ', '. - join(itertools.chain( + join(itertools.chain( map(repr, self.args), ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), )) diff --git a/bonobo/util/helpers.py b/bonobo/util/helpers.py index c5d468a..227fdbd 100644 --- a/bonobo/util/helpers.py +++ b/bonobo/util/helpers.py @@ -1,3 +1,7 @@ +from bonobo.util.compat import deprecated + + +@deprecated def console_run(*chain, output=True, plugins=None, strategy=None): from bonobo import run from bonobo.ext.console import ConsoleOutputPlugin @@ -5,6 +9,7 @@ def console_run(*chain, output=True, plugins=None, strategy=None): return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy) +@deprecated def jupyter_run(*chain, plugins=None, strategy=None): from bonobo import run from bonobo.ext.jupyter import JupyterOutputPlugin diff --git a/bonobo/util/objects.py b/bonobo/util/objects.py index bb06cc4..8b5db00 100644 --- a/bonobo/util/objects.py +++ b/bonobo/util/objects.py @@ -199,3 +199,12 @@ class ValueHolder: def __invert__(self): return ~self.value + + +def get_attribute_or_create(obj, attr, default): + try: + return getattr(obj, attr) + except AttributeError: + setattr(obj, attr, default) + return getattr(obj, attr) + diff --git a/docs/guide/plugins.rst b/docs/guide/plugins.rst new file mode 100644 index 0000000..6426be8 --- /dev/null +++ b/docs/guide/plugins.rst @@ -0,0 +1,17 @@ +Plugins +======= + + +Graph level plugins +::::::::::::::::::: + + +Node level plugins +:::::::::::::::::: + +enhancers + + +node + - + diff --git a/docs/roadmap.rst b/docs/roadmap.rst index 23993ae..4bfcc91 100644 --- a/docs/roadmap.rst +++ b/docs/roadmap.rst @@ -1,6 +1,27 @@ Detailed roadmap ================ +initialize / finalize better than start / stop ? + +Graph and node level plugins +:::::::::::::::::::::::::::: + + * Enhancers or nide-level plugins + * Graph level plugins + * Documentation + +Command line interface and environment +:::::::::::::::::::::::::::::::::::::: + +* How do we manage environment ? .env ? +* How do we configure plugins ? +* Console run should allow console plugin as a command line argument (or silence it). + +Services and Processors +::::::::::::::::::::::: + +* ContextProcessors not clean + Next... ::::::: @@ -10,8 +31,13 @@ Next... * Windows break because of readme encoding. Fix in edgy. * bonobo init --with sqlalchemy,docker * logger, vebosity level -* Console run should allow console plugin as a command line argument (or silence it). -* ContextProcessors not clean + + +External libs that looks good +::::::::::::::::::::::::::::: + +* dask.distributed +* mediator (event dispatcher) Version 0.3 ::::::::::: diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 06f3d40..59f7197 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -12,7 +12,7 @@ def test_write_csv_to_file(tmpdir): writer = CsvWriter(path=filename) context = NodeExecutionContext(writer, services={'fs': fs}) - context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) + context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) context.start() context.step() @@ -34,7 +34,7 @@ def test_read_csv_from_file(tmpdir): context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() - context.recv(BEGIN, Bag(), END) + context.write(BEGIN, Bag(), END) context.step() context.stop() diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 918b27f..a6ac8c4 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -20,7 +20,7 @@ def test_file_writer_in_context(tmpdir, lines, output): context = NodeExecutionContext(writer, services={'fs': fs}) context.start() - context.recv(BEGIN, *map(Bag, lines), END) + context.write(BEGIN, *map(Bag, lines), END) for _ in range(len(lines)): context.step() context.stop() @@ -48,7 +48,7 @@ def test_file_reader_in_context(tmpdir): context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() - context.recv(BEGIN, Bag(), END) + context.write(BEGIN, Bag(), END) context.step() context.stop() diff --git a/tests/io/test_json.py b/tests/io/test_json.py index c53ff0c..442397d 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -13,7 +13,7 @@ def test_write_json_to_file(tmpdir): context = NodeExecutionContext(writer, services={'fs': fs}) context.start() - context.recv(BEGIN, Bag({'foo': 'bar'}), END) + context.write(BEGIN, Bag({'foo': 'bar'}), END) context.step() context.stop() @@ -34,7 +34,7 @@ def test_read_json_from_file(tmpdir): context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context.start() - context.recv(BEGIN, Bag(), END) + context.write(BEGIN, Bag(), END) context.step() context.stop() diff --git a/tests/structs/test_bags.py b/tests/structs/test_bags.py index 14f49c5..1d44026 100644 --- a/tests/structs/test_bags.py +++ b/tests/structs/test_bags.py @@ -1,9 +1,11 @@ from unittest.mock import Mock +import pickle from bonobo import Bag from bonobo.constants import INHERIT_INPUT +from bonobo.structs import Token -args = ('foo', 'bar', ) +args = ('foo', 'bar',) kwargs = dict(acme='corp') @@ -32,33 +34,56 @@ def test_inherit(): bag3 = bag.extend('c', c=3) bag4 = Bag('d', d=4) - assert bag.args == ('a', ) + assert bag.args == ('a',) assert bag.kwargs == {'a': 1} assert bag.flags is () - assert bag2.args == ('a', 'b', ) + assert bag2.args == ('a', 'b',) assert bag2.kwargs == {'a': 1, 'b': 2} assert INHERIT_INPUT in bag2.flags - assert bag3.args == ('a', 'c', ) + assert bag3.args == ('a', 'c',) assert bag3.kwargs == {'a': 1, 'c': 3} assert bag3.flags is () - assert bag4.args == ('d', ) + assert bag4.args == ('d',) assert bag4.kwargs == {'d': 4} assert bag4.flags is () bag4.set_parent(bag) - assert bag4.args == ('a', 'd', ) + assert bag4.args == ('a', 'd',) assert bag4.kwargs == {'a': 1, 'd': 4} assert bag4.flags is () bag4.set_parent(bag3) - assert bag4.args == ('a', 'c', 'd', ) + assert bag4.args == ('a', 'c', 'd',) assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4} assert bag4.flags is () +def test_pickle(): + bag1 = Bag('a', a=1) + bag2 = Bag.inherit('b', b=2, _parent=bag1) + bag3 = bag1.extend('c', c=3) + bag4 = Bag('d', d=4) + + # XXX todo this probably won't work with inheriting bags if parent is not there anymore? maybe that's not true + # because the parent may be in the serialization output but we need to verify this assertion. + + for bag in bag1, bag2, bag3, bag4: + pickled = pickle.dumps(bag) + unpickled = pickle.loads(pickled) + assert unpickled == bag + + +def test_eq_operator(): + assert Bag('foo') == Bag('foo') + assert Bag('foo') != Bag('bar') + assert Bag('foo') is not Bag('foo') + assert Bag('foo') != Token('foo') + assert Token('foo') != Bag('foo') + + def test_repr(): bag = Bag('a', a=1) assert repr(bag) == ""