Split execution module in submodules for better readability.

This commit is contained in:
Romain Dorgueil
2017-04-25 23:05:18 +02:00
parent 9b87597c24
commit ad502d7e23
16 changed files with 384 additions and 365 deletions

View File

@ -1,7 +1,7 @@
# This file has been auto-generated. # This file has been auto-generated.
# All changes will be lost, see Projectfile. # All changes will be lost, see Projectfile.
# #
# Updated at 2017-04-24 23:47:46.325867 # Updated at 2017-04-25 23:05:05.062813
PYTHON ?= $(shell which python) PYTHON ?= $(shell which python)
PYTHON_BASENAME ?= $(shell basename $(PYTHON)) PYTHON_BASENAME ?= $(shell basename $(PYTHON))

View File

@ -23,7 +23,7 @@ enable_features = {
install_requires = [ install_requires = [
'colorama >=0.3,<0.4', 'colorama >=0.3,<0.4',
'psutil >=5.2,<5.3', 'psutil >=5.2,<5.3',
'requests >=2.12,<2.13', 'requests >=2.13,<2.14',
'stevedore >=1.19,<1.20', 'stevedore >=1.19,<1.20',
] ]

View File

@ -1,340 +0,0 @@
import sys
import traceback
from functools import partial
from queue import Empty
from time import sleep
from bonobo.config import Container
from bonobo.config.processors import resolve_processors
from bonobo.constants import BEGIN, END, INHERIT_INPUT, NOT_MODIFIED
from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper
__all__ = [
'GraphExecutionContext',
'NodeExecutionContext',
'PluginExecutionContext',
]
class GraphExecutionContext:
@property
def started(self):
return any(node.started for node in self.nodes)
@property
def stopped(self):
return all(node.started and node.stopped for node in self.nodes)
@property
def alive(self):
return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None):
self.graph = graph
self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container()
for i, component_context in enumerate(self):
try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError:
continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.stop)
def __getitem__(self, item):
return self.nodes[item]
def __len__(self):
return len(self.nodes)
def __iter__(self):
yield from self.nodes
def recv(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].recv(message)
def start(self):
# todo use strategy
for node in self.nodes:
node.start()
def loop(self):
# todo use strategy
for node in self.nodes:
node.loop()
def stop(self):
# todo use strategy
for node in self.nodes:
node.stop()
class LoopingExecutionContext(Wrapper):
alive = True
PERIOD = 0.25
@property
def state(self):
return self._started, self._stopped
@property
def started(self):
return self._started
@property
def stopped(self):
return self._stopped
def __init__(self, wrapped, parent):
super().__init__(wrapped)
self.parent = parent
self._started, self._stopped, self._context, self._stack = False, False, None, []
def start(self):
assert self.state == (False,
False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None
self._started = True
try:
self._context = self.parent.services.args_for(self.wrapped) if self.parent else ()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
for processor in resolve_processors(self.wrapped):
try:
_processed = processor(self.wrapped, self, *self._context)
_append_to_context = next(_processed)
if _append_to_context is not None:
self._context += ensure_tuple(_append_to_context)
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
self._stack.append(_processed)
def loop(self):
"""Generic loop. A bit boring. """
while self.alive:
self.step()
sleep(self.PERIOD)
def step(self):
"""Left as an exercise for the children."""
raise NotImplementedError('Abstract.')
def stop(self):
assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__)
if self._stopped:
return
assert self._context is not None
self._stopped = True
while len(self._stack):
processor = self._stack.pop()
try:
# todo yield from ? how to ?
next(processor)
except StopIteration as exc:
# This is normal, and wanted.
pass
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
else:
# No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')
def handle_error(self, exc, trace):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)
class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
"""
todo: make the counter dependant of parent context?
"""
@property
def alive(self):
"""todo check if this is right, and where it is used"""
return self.input.alive and self._started and not self._stopped
def __init__(self, wrapped, parent):
LoopingExecutionContext.__init__(self, wrapped, parent)
WithStatistics.__init__(self, 'in', 'out', 'err')
self.input = Input()
self.outputs = []
def __str__(self):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip()
def __repr__(self):
return '<' + self.__str__() + '>'
def recv(self, *messages):
"""
Push a message list to this context's input queue.
:param mixed value: message
"""
for message in messages:
self.input.put(message)
def send(self, value, _control=False):
"""
Sends a message to all of this context's outputs.
:param mixed value: message
:param _control: if true, won't count in statistics.
"""
if not _control:
self.increment('out')
for output in self.outputs:
output.put(value)
def get(self):
"""
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
"""
row = self.input.get(timeout=self.PERIOD)
self.increment('in')
return row
def loop(self):
while True:
try:
self.step()
except KeyboardInterrupt:
raise
except InactiveReadableError:
break
except Empty:
sleep(self.PERIOD)
continue
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def step(self):
# Pull data from the first available input channel.
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
input_bag = self.get()
# todo add timer
self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context))
def handle_results(self, input_bag, results):
# self._exec_time += timer.duration
# Put data onto output channels
try:
results = _iter(results)
except TypeError: # not an iterator
if results:
if isinstance(results, ErrorBag):
results.apply(self.handle_error)
else:
self.send(_resolve(input_bag, results))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
pass
else:
while True: # iterator
try:
output = next(results)
except StopIteration:
break
else:
if isinstance(output, ErrorBag):
output.apply(self.handle_error)
else:
self.send(_resolve(input_bag, output))
class PluginExecutionContext(LoopingExecutionContext):
PERIOD = 0.5
def __init__(self, wrapped, parent):
# Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure
# plugins, for example if it depends on an external service.
super().__init__(wrapped(self), parent)
def start(self):
super().start()
try:
self.wrapped.initialize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def shutdown(self):
try:
self.wrapped.finalize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
finally:
self.alive = False
def step(self):
try:
self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def _iter(mixed):
if isinstance(mixed, (dict, list, str)):
raise TypeError(type(mixed).__name__)
return iter(mixed)
def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED:
return input_bag
# If it does not look like a bag, let's create one for easier manipulation
if hasattr(output, 'apply'):
# Already a bag? Check if we need to set parent.
if INHERIT_INPUT in output.flags:
output.set_parent(input_bag)
else:
# Not a bag? Let's encapsulate it.
output = Bag(output)
return output

View File

@ -0,0 +1,9 @@
from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext
__all__ = [
'GraphExecutionContext',
'NodeExecutionContext',
'PluginExecutionContext',
]

105
bonobo/execution/base.py Normal file
View File

@ -0,0 +1,105 @@
import sys
import traceback
from time import sleep
from bonobo.config.processors import resolve_processors
from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper
class LoopingExecutionContext(Wrapper):
alive = True
PERIOD = 0.25
@property
def state(self):
return self._started, self._stopped
@property
def started(self):
return self._started
@property
def stopped(self):
return self._stopped
def __init__(self, wrapped, parent):
super().__init__(wrapped)
self.parent = parent
self._started, self._stopped, self._context, self._stack = False, False, None, []
def start(self):
assert self.state == (False,
False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None
self._started = True
try:
self._context = self.parent.services.args_for(self.wrapped) if self.parent else ()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
for processor in resolve_processors(self.wrapped):
try:
_processed = processor(self.wrapped, self, *self._context)
_append_to_context = next(_processed)
if _append_to_context is not None:
self._context += ensure_tuple(_append_to_context)
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
self._stack.append(_processed)
def loop(self):
"""Generic loop. A bit boring. """
while self.alive:
self.step()
sleep(self.PERIOD)
def step(self):
"""Left as an exercise for the children."""
raise NotImplementedError('Abstract.')
def stop(self):
assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__)
if self._stopped:
return
assert self._context is not None
self._stopped = True
while len(self._stack):
processor = self._stack.pop()
try:
# todo yield from ? how to ?
next(processor)
except StopIteration as exc:
# This is normal, and wanted.
pass
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
else:
# No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')
def handle_error(self, exc, trace):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)

68
bonobo/execution/graph.py Normal file
View File

@ -0,0 +1,68 @@
from functools import partial
from bonobo.config.services import Container
from bonobo.constants import BEGIN, END
from bonobo.execution.node import NodeExecutionContext
from bonobo.execution.plugin import PluginExecutionContext
class GraphExecutionContext:
@property
def started(self):
return any(node.started for node in self.nodes)
@property
def stopped(self):
return all(node.started and node.stopped for node in self.nodes)
@property
def alive(self):
return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None):
self.graph = graph
self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container()
for i, component_context in enumerate(self):
try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError:
continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.stop)
def __getitem__(self, item):
return self.nodes[item]
def __len__(self):
return len(self.nodes)
def __iter__(self):
yield from self.nodes
def recv(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].recv(message)
def start(self):
# todo use strategy
for node in self.nodes:
node.start()
def loop(self):
# todo use strategy
for node in self.nodes:
node.loop()
def stop(self):
# todo use strategy
for node in self.nodes:
node.stop()

133
bonobo/execution/node.py Normal file
View File

@ -0,0 +1,133 @@
import traceback
from queue import Empty
from time import sleep
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED
from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext
from bonobo.util.iterators import iter_if_not_sequence
class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
"""
todo: make the counter dependant of parent context?
"""
@property
def alive(self):
"""todo check if this is right, and where it is used"""
return self.input.alive and self._started and not self._stopped
def __init__(self, wrapped, parent):
LoopingExecutionContext.__init__(self, wrapped, parent)
WithStatistics.__init__(self, 'in', 'out', 'err')
self.input = Input()
self.outputs = []
def __str__(self):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip()
def __repr__(self):
return '<' + self.__str__() + '>'
def recv(self, *messages):
"""
Push a message list to this context's input queue.
:param mixed value: message
"""
for message in messages:
self.input.put(message)
def send(self, value, _control=False):
"""
Sends a message to all of this context's outputs.
:param mixed value: message
:param _control: if true, won't count in statistics.
"""
if not _control:
self.increment('out')
for output in self.outputs:
output.put(value)
def get(self):
"""
Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed.
"""
row = self.input.get(timeout=self.PERIOD)
self.increment('in')
return row
def loop(self):
while True:
try:
self.step()
except KeyboardInterrupt:
raise
except InactiveReadableError:
break
except Empty:
sleep(self.PERIOD)
continue
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def step(self):
# Pull data from the first available input channel.
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
input_bag = self.get()
# todo add timer
self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context))
def handle_results(self, input_bag, results):
# self._exec_time += timer.duration
# Put data onto output channels
try:
results = iter_if_not_sequence(results)
except TypeError: # not an iterator
if results:
if isinstance(results, ErrorBag):
results.apply(self.handle_error)
else:
self.send(_resolve(input_bag, results))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
pass
else:
while True: # iterator
try:
output = next(results)
except StopIteration:
break
else:
if isinstance(output, ErrorBag):
output.apply(self.handle_error)
else:
self.send(_resolve(input_bag, output))
def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED:
return input_bag
# If it does not look like a bag, let's create one for easier manipulation
if hasattr(output, 'apply'):
# Already a bag? Check if we need to set parent.
if INHERIT_INPUT in output.flags:
output.set_parent(input_bag)
else:
# Not a bag? Let's encapsulate it.
output = Bag(output)
return output

View File

@ -0,0 +1,34 @@
import traceback
from bonobo.execution.base import LoopingExecutionContext
class PluginExecutionContext(LoopingExecutionContext):
PERIOD = 0.5
def __init__(self, wrapped, parent):
# Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure
# plugins, for example if it depends on an external service.
super().__init__(wrapped(self), parent)
def start(self):
super().start()
try:
self.wrapped.initialize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def shutdown(self):
try:
self.wrapped.finalize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
finally:
self.alive = False
def step(self):
try:
self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())

View File

@ -1,4 +1,4 @@
from bonobo.execution import GraphExecutionContext from bonobo.execution.graph import GraphExecutionContext
class Strategy: class Strategy:

View File

@ -20,4 +20,10 @@ def force_iterator(mixed):
def ensure_tuple(tuple_or_mixed): def ensure_tuple(tuple_or_mixed):
if isinstance(tuple_or_mixed, tuple): if isinstance(tuple_or_mixed, tuple):
return tuple_or_mixed return tuple_or_mixed
return (tuple_or_mixed,) return (tuple_or_mixed,)
def iter_if_not_sequence(mixed):
if isinstance(mixed, (dict, list, str)):
raise TypeError(type(mixed).__name__)
return iter(mixed)

View File

@ -1,6 +1,6 @@
from unittest.mock import MagicMock from unittest.mock import MagicMock
from bonobo.execution import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
class CapturingNodeExecutionContext(NodeExecutionContext): class CapturingNodeExecutionContext(NodeExecutionContext):

View File

@ -40,36 +40,40 @@ setup(
name='bonobo', name='bonobo',
description='Bonobo', description='Bonobo',
license='Apache License, Version 2.0', license='Apache License, Version 2.0',
install_requires=['colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.12,<2.13', 'stevedore >=1.19,<1.20'], install_requires=[
'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.13,<2.14',
'stevedore >=1.19,<1.20'
],
version=version, version=version,
long_description=read('README.rst'), long_description=read('README.rst'),
classifiers=read('classifiers.txt', tolines), classifiers=read('classifiers.txt', tolines),
packages=find_packages(exclude=['ez_setup', 'example', 'test']), packages=find_packages(exclude=['ez_setup', 'example', 'test']),
include_package_data=True, include_package_data=True,
data_files=[ data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [
( 'bonobo/ext/jupyter/static/extension.js',
'share/jupyter/nbextensions/bonobo-jupyter', [ 'bonobo/ext/jupyter/static/index.js',
'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', 'bonobo/ext/jupyter/static/index.js.map'
'bonobo/ext/jupyter/static/index.js.map' ])],
]
)
],
extras_require={ extras_require={
'dev': [ 'dev': [
'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4', 'pylint >=1.6,<1.7', 'pytest >=3,<4', 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'pylint >=1,<2',
'pytest-cov >=2.4,<2.5', 'pytest-timeout >=1.2,<1.3', 'sphinx', 'sphinx_rtd_theme', 'yapf' 'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2',
'sphinx', 'sphinx_rtd_theme', 'yapf'
], ],
'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5']
}, },
entry_points={ entry_points={
'bonobo.commands': [ 'bonobo.commands': [
'init = bonobo.commands.init:register', 'run = bonobo.commands.run:register', 'init = bonobo.commands.init:register',
'run = bonobo.commands.run:register',
'version = bonobo.commands.version:register' 'version = bonobo.commands.version:register'
], ],
'console_scripts': ['bonobo = bonobo.commands:entrypoint'], 'console_scripts': ['bonobo = bonobo.commands:entrypoint'],
'edgy.project.features': ['bonobo = ' 'edgy.project.features':
'bonobo.ext.edgy.project.feature:BonoboFeature'] ['bonobo = '
'bonobo.ext.edgy.project.feature:BonoboFeature']
}, },
url='https://www.bonobo-project.org/', url='https://www.bonobo-project.org/',
download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), download_url=
) 'https://github.com/python-bonobo/bonobo/tarball/{version}'.format(
version=version), )

View File

@ -2,7 +2,7 @@ import pytest
from bonobo import Bag, CsvReader, CsvWriter from bonobo import Bag, CsvReader, CsvWriter
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext

View File

@ -2,7 +2,7 @@ import pytest
from bonobo import Bag, FileReader, FileWriter from bonobo import Bag, FileReader, FileWriter
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext

View File

@ -2,7 +2,7 @@ import pytest
from bonobo import Bag, JsonReader, JsonWriter from bonobo import Bag, JsonReader, JsonWriter
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.util.testing import CapturingNodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext

View File

@ -1,7 +1,7 @@
from bonobo import Graph, NaiveStrategy, Bag from bonobo import Graph, NaiveStrategy, Bag
from bonobo.config.processors import contextual from bonobo.config.processors import contextual
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution import GraphExecutionContext from bonobo.execution.graph import GraphExecutionContext
def generate_integers(): def generate_integers():