first draft of enhancers.

This commit is contained in:
Romain Dorgueil
2017-05-08 11:33:02 +02:00
parent 6cc3cedf1a
commit 171fa3415b
16 changed files with 197 additions and 62 deletions

View File

@ -24,7 +24,8 @@ class WithStatistics:
return ((name, self.statistics[name]) for name in self.statistics_names) return ((name, self.statistics[name]) for name in self.statistics_names)
def get_statistics_as_string(self, *args, **kwargs): def get_statistics_as_string(self, *args, **kwargs):
return ' '.join(('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)) stats = tuple('{0}={1}'.format(name, cnt) for name, cnt in self.get_statistics(*args, **kwargs) if cnt > 0)
return (kwargs.get('prefix', '') + ' '.join(stats)) if len(stats) else ''
def increment(self, name): def increment(self, name):
self.statistics[name] += 1 self.statistics[name] += 1

View File

@ -0,0 +1,13 @@
def require(package, requirement=None):
requirement = requirement or package
try:
return __import__(package)
except ImportError:
from colorama import Fore, Style
print(Fore.YELLOW, 'This example requires the {!r} package. Install it using:'.format(requirement),
Style.RESET_ALL, sep='')
print()
print(Fore.YELLOW, ' $ pip install {!s}'.format(requirement), Style.RESET_ALL, sep='')
print()
raise

View File

@ -1,21 +1,27 @@
import traceback import traceback
from contextlib import contextmanager
from time import sleep from time import sleep
from bonobo.config import Container from bonobo.config import Container
from bonobo.config.processors import resolve_processors, ContextCurrifier from bonobo.config.processors import ContextCurrifier
from bonobo.plugins import get_enhancers
from bonobo.util.errors import print_error from bonobo.util.errors import print_error
from bonobo.util.iterators import ensure_tuple from bonobo.util.objects import Wrapper, get_name
from bonobo.util.objects import Wrapper
@contextmanager
def unrecoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(exc, traceback.format_exc())
raise # raise unrecoverableerror from x ?
class LoopingExecutionContext(Wrapper): class LoopingExecutionContext(Wrapper):
alive = True alive = True
PERIOD = 0.25 PERIOD = 0.25
@property
def state(self):
return self._started, self._stopped
@property @property
def started(self): def started(self):
return self._started return self._started
@ -26,7 +32,9 @@ class LoopingExecutionContext(Wrapper):
def __init__(self, wrapped, parent, services=None): def __init__(self, wrapped, parent, services=None):
super().__init__(wrapped) super().__init__(wrapped)
self.parent = parent self.parent = parent
if services: if services:
if parent: if parent:
raise RuntimeError( raise RuntimeError(
@ -36,19 +44,25 @@ class LoopingExecutionContext(Wrapper):
else: else:
self.services = None self.services = None
self._started, self._stopped, self._stack = False, False, None self._started, self._stopped = False, False
self._stack = None
# XXX enhancers
self._enhancers = get_enhancers(self.wrapped)
def start(self): def start(self):
assert self.state == (False, if self.started:
False), ('{}.start() can only be called on a new node.').format(type(self).__name__) raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self)))
self._started = True self._started = True
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context()) self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
try: with unrecoverable(self.handle_error):
self._stack.setup(self) self._stack.setup(self)
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc()) for enhancer in self._enhancers:
raise with unrecoverable(self.handle_error):
enhancer.start(self)
def loop(self): def loop(self):
"""Generic loop. A bit boring. """ """Generic loop. A bit boring. """
@ -61,16 +75,16 @@ class LoopingExecutionContext(Wrapper):
raise NotImplementedError('Abstract.') raise NotImplementedError('Abstract.')
def stop(self): def stop(self):
assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) if not self.started:
raise RuntimeError('Cannot stop an unstarted node ({}).'.format(get_name(self)))
if self._stopped: if self._stopped:
return return
self._stopped = True self._stopped = True
try:
with unrecoverable(self.handle_error):
self._stack.teardown() self._stack.teardown()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
def handle_error(self, exc, trace): def handle_error(self, exc, trace):
return print_error(exc, trace, context=self.wrapped) return print_error(exc, trace, context=self.wrapped)

View File

@ -50,7 +50,7 @@ class GraphExecutionContext:
for i in self.graph.outputs_of(BEGIN): for i in self.graph.outputs_of(BEGIN):
for message in messages: for message in messages:
self[i].recv(message) self[i].write(message)
def start(self): def start(self):
# todo use strategy # todo use strategy
@ -65,4 +65,4 @@ class GraphExecutionContext:
def stop(self): def stop(self):
# todo use strategy # todo use strategy
for node in self.nodes: for node in self.nodes:
node.stop() node.stop()

View File

@ -8,8 +8,10 @@ from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag from bonobo.structs.bags import Bag
from bonobo.util.compat import deprecated_alias
from bonobo.util.errors import is_error from bonobo.util.errors import is_error
from bonobo.util.iterators import iter_if_not_sequence from bonobo.util.iterators import iter_if_not_sequence
from bonobo.util.objects import get_name
class NodeExecutionContext(WithStatistics, LoopingExecutionContext): class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
@ -22,6 +24,10 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
"""todo check if this is right, and where it is used""" """todo check if this is right, and where it is used"""
return self.input.alive and self._started and not self._stopped return self.input.alive and self._started and not self._stopped
@property
def alive_str(self):
return '+' if self.alive else '-'
def __init__(self, wrapped, parent=None, services=None): def __init__(self, wrapped, parent=None, services=None):
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
WithStatistics.__init__(self, 'in', 'out', 'err') WithStatistics.__init__(self, 'in', 'out', 'err')
@ -30,18 +36,13 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
self.outputs = [] self.outputs = []
def __str__(self): def __str__(self):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() return self.alive_str + ' ' + self.__name__ + self.get_statistics_as_string(prefix=' ')
def __repr__(self): def __repr__(self):
stats = self.get_statistics_as_string().strip() name, type_name = get_name(self), get_name(type(self))
return '<{}({}{}){}>'.format( return '<{}({}{}){}>'.format(type_name, self.alive_str, name, self.get_statistics_as_string(prefix=' '))
type(self).__name__,
'+' if self.alive else '',
self.__name__,
(' ' + stats) if stats else '',
)
def recv(self, *messages): def write(self, *messages): # XXX write() ? ( node.write(...) )
""" """
Push a message list to this context's input queue. Push a message list to this context's input queue.
@ -50,19 +51,29 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
for message in messages: for message in messages:
self.input.put(message) self.input.put(message)
def send(self, value, _control=False): # XXX deprecated alias
recv = deprecated_alias('recv', write)
def send(self, value, _control=False): # XXX self.send(....)
""" """
Sends a message to all of this context's outputs. Sends a message to all of this context's outputs.
:param mixed value: message :param mixed value: message
:param _control: if true, won't count in statistics. :param _control: if true, won't count in statistics.
""" """
if not _control: if not _control:
self.increment('out') self.increment('out')
for output in self.outputs:
output.put(value)
def get(self): if is_error(value):
value.apply(self.handle_error)
else:
for output in self.outputs:
output.put(value)
push = deprecated_alias('push', send)
def get(self): # recv() ? input_data = self.receive()
""" """
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
@ -95,12 +106,6 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
# todo add timer # todo add timer
self.handle_results(input_bag, input_bag.apply(self._stack)) self.handle_results(input_bag, input_bag.apply(self._stack))
def push(self, bag):
# MAKE THIS PUBLIC API FOR CONTEXT PROCESSORS !!!
# xxx handle error or send in first call to apply(...)?
# xxx return value ?
bag.apply(self.handle_error) if is_error(bag) else self.send(bag)
def handle_results(self, input_bag, results): def handle_results(self, input_bag, results):
# self._exec_time += timer.duration # self._exec_time += timer.duration
# Put data onto output channels # Put data onto output channels
@ -108,7 +113,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
results = iter_if_not_sequence(results) results = iter_if_not_sequence(results)
except TypeError: # not an iterator except TypeError: # not an iterator
if results: if results:
self.push(_resolve(input_bag, results)) self.send(_resolve(input_bag, results))
else: else:
# case with no result, an execution went through anyway, use for stats. # case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1 # self._exec_count += 1
@ -120,7 +125,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
except StopIteration: except StopIteration:
break break
else: else:
self.push(_resolve(input_bag, result)) self.send(_resolve(input_bag, result))
def _resolve(input_bag, output): def _resolve(input_bag, output):

View File

@ -31,4 +31,4 @@ class PluginExecutionContext(LoopingExecutionContext):
try: try:
self.wrapped.run() self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())

View File

@ -1,3 +1,7 @@
from bonobo.config import Configurable
from bonobo.util.objects import get_attribute_or_create
class Plugin: class Plugin:
""" """
A plugin is an extension to the core behavior of bonobo. If you're writing transformations, you should not need A plugin is an extension to the core behavior of bonobo. If you're writing transformations, you should not need
@ -21,3 +25,16 @@ class Plugin:
def finalize(self): def finalize(self):
pass pass
def get_enhancers(obj):
try:
return get_attribute_or_create(obj, '__enhancers__', list())
except AttributeError:
return list()
class NodeEnhancer(Configurable):
def __matmul__(self, other):
get_enhancers(other).append(self)
return other

View File

@ -43,7 +43,7 @@ class Bag:
def args(self): def args(self):
if self._parent is None: if self._parent is None:
return self._args return self._args
return (*self._parent.args, *self._args, ) return (*self._parent.args, *self._args,)
@property @property
def kwargs(self): def kwargs(self):
@ -85,12 +85,15 @@ class Bag:
@classmethod @classmethod
def inherit(cls, *args, **kwargs): def inherit(cls, *args, **kwargs):
return cls(*args, _flags=(INHERIT_INPUT, ), **kwargs) return cls(*args, _flags=(INHERIT_INPUT,), **kwargs)
def __eq__(self, other):
return isinstance(other, Bag) and other.args == self.args and other.kwargs == self.kwargs
def __repr__(self): def __repr__(self):
return '<{} ({})>'.format( return '<{} ({})>'.format(
type(self).__name__, ', '. type(self).__name__, ', '.
join(itertools.chain( join(itertools.chain(
map(repr, self.args), map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), ('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()),
)) ))

View File

@ -1,3 +1,7 @@
from bonobo.util.compat import deprecated
@deprecated
def console_run(*chain, output=True, plugins=None, strategy=None): def console_run(*chain, output=True, plugins=None, strategy=None):
from bonobo import run from bonobo import run
from bonobo.ext.console import ConsoleOutputPlugin from bonobo.ext.console import ConsoleOutputPlugin
@ -5,6 +9,7 @@ def console_run(*chain, output=True, plugins=None, strategy=None):
return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy) return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy)
@deprecated
def jupyter_run(*chain, plugins=None, strategy=None): def jupyter_run(*chain, plugins=None, strategy=None):
from bonobo import run from bonobo import run
from bonobo.ext.jupyter import JupyterOutputPlugin from bonobo.ext.jupyter import JupyterOutputPlugin

View File

@ -199,3 +199,12 @@ class ValueHolder:
def __invert__(self): def __invert__(self):
return ~self.value return ~self.value
def get_attribute_or_create(obj, attr, default):
try:
return getattr(obj, attr)
except AttributeError:
setattr(obj, attr, default)
return getattr(obj, attr)

17
docs/guide/plugins.rst Normal file
View File

@ -0,0 +1,17 @@
Plugins
=======
Graph level plugins
:::::::::::::::::::
Node level plugins
::::::::::::::::::
enhancers
node
-

View File

@ -1,6 +1,27 @@
Detailed roadmap Detailed roadmap
================ ================
initialize / finalize better than start / stop ?
Graph and node level plugins
::::::::::::::::::::::::::::
* Enhancers or nide-level plugins
* Graph level plugins
* Documentation
Command line interface and environment
::::::::::::::::::::::::::::::::::::::
* How do we manage environment ? .env ?
* How do we configure plugins ?
* Console run should allow console plugin as a command line argument (or silence it).
Services and Processors
:::::::::::::::::::::::
* ContextProcessors not clean
Next... Next...
::::::: :::::::
@ -10,8 +31,13 @@ Next...
* Windows break because of readme encoding. Fix in edgy. * Windows break because of readme encoding. Fix in edgy.
* bonobo init --with sqlalchemy,docker * bonobo init --with sqlalchemy,docker
* logger, vebosity level * logger, vebosity level
* Console run should allow console plugin as a command line argument (or silence it).
* ContextProcessors not clean
External libs that looks good
:::::::::::::::::::::::::::::
* dask.distributed
* mediator (event dispatcher)
Version 0.3 Version 0.3
::::::::::: :::::::::::

View File

@ -12,7 +12,7 @@ def test_write_csv_to_file(tmpdir):
writer = CsvWriter(path=filename) writer = CsvWriter(path=filename)
context = NodeExecutionContext(writer, services={'fs': fs}) context = NodeExecutionContext(writer, services={'fs': fs})
context.recv(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END) context.write(BEGIN, Bag({'foo': 'bar'}), Bag({'foo': 'baz', 'ignore': 'this'}), END)
context.start() context.start()
context.step() context.step()
@ -34,7 +34,7 @@ def test_read_csv_from_file(tmpdir):
context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
context.step() context.step()
context.stop() context.stop()

View File

@ -20,7 +20,7 @@ def test_file_writer_in_context(tmpdir, lines, output):
context = NodeExecutionContext(writer, services={'fs': fs}) context = NodeExecutionContext(writer, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, *map(Bag, lines), END) context.write(BEGIN, *map(Bag, lines), END)
for _ in range(len(lines)): for _ in range(len(lines)):
context.step() context.step()
context.stop() context.stop()
@ -48,7 +48,7 @@ def test_file_reader_in_context(tmpdir):
context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
context.step() context.step()
context.stop() context.stop()

View File

@ -13,7 +13,7 @@ def test_write_json_to_file(tmpdir):
context = NodeExecutionContext(writer, services={'fs': fs}) context = NodeExecutionContext(writer, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag({'foo': 'bar'}), END) context.write(BEGIN, Bag({'foo': 'bar'}), END)
context.step() context.step()
context.stop() context.stop()
@ -34,7 +34,7 @@ def test_read_json_from_file(tmpdir):
context = CapturingNodeExecutionContext(reader, services={'fs': fs}) context = CapturingNodeExecutionContext(reader, services={'fs': fs})
context.start() context.start()
context.recv(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
context.step() context.step()
context.stop() context.stop()

View File

@ -1,9 +1,11 @@
from unittest.mock import Mock from unittest.mock import Mock
import pickle
from bonobo import Bag from bonobo import Bag
from bonobo.constants import INHERIT_INPUT from bonobo.constants import INHERIT_INPUT
from bonobo.structs import Token
args = ('foo', 'bar', ) args = ('foo', 'bar',)
kwargs = dict(acme='corp') kwargs = dict(acme='corp')
@ -32,33 +34,56 @@ def test_inherit():
bag3 = bag.extend('c', c=3) bag3 = bag.extend('c', c=3)
bag4 = Bag('d', d=4) bag4 = Bag('d', d=4)
assert bag.args == ('a', ) assert bag.args == ('a',)
assert bag.kwargs == {'a': 1} assert bag.kwargs == {'a': 1}
assert bag.flags is () assert bag.flags is ()
assert bag2.args == ('a', 'b', ) assert bag2.args == ('a', 'b',)
assert bag2.kwargs == {'a': 1, 'b': 2} assert bag2.kwargs == {'a': 1, 'b': 2}
assert INHERIT_INPUT in bag2.flags assert INHERIT_INPUT in bag2.flags
assert bag3.args == ('a', 'c', ) assert bag3.args == ('a', 'c',)
assert bag3.kwargs == {'a': 1, 'c': 3} assert bag3.kwargs == {'a': 1, 'c': 3}
assert bag3.flags is () assert bag3.flags is ()
assert bag4.args == ('d', ) assert bag4.args == ('d',)
assert bag4.kwargs == {'d': 4} assert bag4.kwargs == {'d': 4}
assert bag4.flags is () assert bag4.flags is ()
bag4.set_parent(bag) bag4.set_parent(bag)
assert bag4.args == ('a', 'd', ) assert bag4.args == ('a', 'd',)
assert bag4.kwargs == {'a': 1, 'd': 4} assert bag4.kwargs == {'a': 1, 'd': 4}
assert bag4.flags is () assert bag4.flags is ()
bag4.set_parent(bag3) bag4.set_parent(bag3)
assert bag4.args == ('a', 'c', 'd', ) assert bag4.args == ('a', 'c', 'd',)
assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4} assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4}
assert bag4.flags is () assert bag4.flags is ()
def test_pickle():
bag1 = Bag('a', a=1)
bag2 = Bag.inherit('b', b=2, _parent=bag1)
bag3 = bag1.extend('c', c=3)
bag4 = Bag('d', d=4)
# XXX todo this probably won't work with inheriting bags if parent is not there anymore? maybe that's not true
# because the parent may be in the serialization output but we need to verify this assertion.
for bag in bag1, bag2, bag3, bag4:
pickled = pickle.dumps(bag)
unpickled = pickle.loads(pickled)
assert unpickled == bag
def test_eq_operator():
assert Bag('foo') == Bag('foo')
assert Bag('foo') != Bag('bar')
assert Bag('foo') is not Bag('foo')
assert Bag('foo') != Token('foo')
assert Token('foo') != Bag('foo')
def test_repr(): def test_repr():
bag = Bag('a', a=1) bag = Bag('a', a=1)
assert repr(bag) == "<Bag ('a', a=1)>" assert repr(bag) == "<Bag ('a', a=1)>"