Merge pull request #45 from hartym/services_init

Trying to fix unending transformations on start() error. (#38)
This commit is contained in:
Romain Dorgueil
2017-05-01 06:52:02 -07:00
committed by GitHub
17 changed files with 164 additions and 102 deletions

1
.gitignore vendored
View File

@ -25,6 +25,7 @@
/.idea /.idea
/.release /.release
/bonobo.iml /bonobo.iml
/bonobo/examples/work_in_progress/
/bonobo/ext/jupyter/js/node_modules/ /bonobo/ext/jupyter/js/node_modules/
/build/ /build/
/coverage.xml /coverage.xml

View File

@ -88,7 +88,15 @@ def open_fs(fs_url, *args, **kwargs):
# bonobo.basics # bonobo.basics
register_api_group(Limit, PrettyPrint, Tee, count, identity, noop, pprint, ) register_api_group(
Limit,
PrettyPrint,
Tee,
count,
identity,
noop,
pprint,
)
# bonobo.io # bonobo.io
register_api_group(CsvReader, CsvWriter, FileReader, FileWriter, JsonReader, JsonWriter) register_api_group(CsvReader, CsvWriter, FileReader, FileWriter, JsonReader, JsonWriter)
@ -116,4 +124,3 @@ def get_examples_path(*pathsegments):
@register_api @register_api
def open_examples_fs(*pathsegments): def open_examples_fs(*pathsegments):
return open_fs(get_examples_path(*pathsegments)) return open_fs(get_examples_path(*pathsegments))

View File

@ -66,6 +66,7 @@ pprint = Tee(_pprint)
def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True): def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True):
from bonobo.constants import NOT_MODIFIED from bonobo.constants import NOT_MODIFIED
def _pprint(*args, **kwargs): def _pprint(*args, **kwargs):
nonlocal title_keys, sort, print_values nonlocal title_keys, sort, print_values

View File

@ -0,0 +1,4 @@
[style]
based_on_style = pep8
column_limit = 74
dedent_closing_brackets = true

View File

@ -25,7 +25,9 @@ from bonobo.ext.opendatasoft import OpenDataSoftAPI
try: try:
import pycountry import pycountry
except ImportError as exc: except ImportError as exc:
raise ImportError('You must install package "pycountry" to run this example.') from exc raise ImportError(
'You must install package "pycountry" to run this example.'
) from exc
API_DATASET = 'fablabs-in-the-world' API_DATASET = 'fablabs-in-the-world'
API_NETLOC = 'datanova.laposte.fr' API_NETLOC = 'datanova.laposte.fr'
@ -57,20 +59,41 @@ def display(row):
address = list( address = list(
filter( filter(
None, ( None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))), row.get('county', None), ' '.join(
row.get('country'), filter(
None, (
row.get('postal_code', None),
row.get('city', None)
)
)
), row.get('county', None), row.get('country'),
) )
) )
) )
print(' - {}address{}: {address}'.format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))) print(
print(' - {}links{}: {links}'.format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))) ' - {}address{}: {address}'.
print(' - {}geometry{}: {geometry}'.format(Fore.BLUE, Style.RESET_ALL, **row)) format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))
print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET)) )
print(
' - {}links{}: {links}'.
format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))
)
print(
' - {}geometry{}: {geometry}'.
format(Fore.BLUE, Style.RESET_ALL, **row)
)
print(
' - {}source{}: {source}'.format(
Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET
)
)
graph = bonobo.Graph( graph = bonobo.Graph(
OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'), OpenDataSoftAPI(
dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
),
normalize, normalize,
filter_france, filter_france,
bonobo.Tee(display), bonobo.Tee(display),

View File

@ -1,9 +1,11 @@
import bonobo import bonobo
from bonobo.commands.run import get_default_services from bonobo.commands.run import get_default_services
def get_fields(row): def get_fields(row):
return row['fields'] return row['fields']
graph = bonobo.Graph( graph = bonobo.Graph(
bonobo.JsonReader(path='datasets/theaters.json'), bonobo.JsonReader(path='datasets/theaters.json'),
get_fields, get_fields,

View File

@ -6,6 +6,6 @@ graph = bonobo.Graph(
) )
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services={ bonobo.run(
'fs': bonobo.open_examples_fs('datasets') graph, services={'fs': bonobo.open_examples_fs('datasets')}
}) )

View File

@ -12,6 +12,6 @@ graph = bonobo.Graph(
) )
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services={ bonobo.run(
'fs': bonobo.open_examples_fs('datasets') graph, services={'fs': bonobo.open_examples_fs('datasets')}
}) )

View File

@ -22,6 +22,6 @@ graph = bonobo.Graph(
) )
if __name__ == '__main__': if __name__ == '__main__':
bonobo.run(graph, services={ bonobo.run(
'fs': bonobo.open_examples_fs('datasets') graph, services={'fs': bonobo.open_examples_fs('datasets')}
}) )

View File

@ -1,9 +1,9 @@
import sys
import traceback import traceback
from time import sleep from time import sleep
from bonobo.config import Container from bonobo.config import Container
from bonobo.config.processors import resolve_processors from bonobo.config.processors import resolve_processors
from bonobo.util.errors import print_error
from bonobo.util.iterators import ensure_tuple from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper from bonobo.util.objects import Wrapper
@ -43,16 +43,13 @@ class LoopingExecutionContext(Wrapper):
False), ('{}.start() can only be called on a new node.').format(type(self).__name__) False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None assert self._context is None
self._started = True self._started = True
try:
if self.parent: if self.parent:
self._context = self.parent.services.args_for(self.wrapped) self._context = self.parent.services.args_for(self.wrapped)
elif self.services: elif self.services:
self._context = self.services.args_for(self.wrapped) self._context = self.services.args_for(self.wrapped)
else: else:
self._context = () self._context = ()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
for processor in resolve_processors(self.wrapped): for processor in resolve_processors(self.wrapped):
try: try:
@ -80,41 +77,22 @@ class LoopingExecutionContext(Wrapper):
if self._stopped: if self._stopped:
return return
assert self._context is not None
self._stopped = True self._stopped = True
while len(self._stack): if self._context is not None:
processor = self._stack.pop() while len(self._stack):
try: processor = self._stack.pop()
# todo yield from ? how to ? try:
next(processor) # todo yield from ? how to ?
except StopIteration as exc: next(processor)
# This is normal, and wanted. except StopIteration as exc:
pass # This is normal, and wanted.
except Exception as exc: # pylint: disable=broad-except pass
self.handle_error(exc, traceback.format_exc()) except Exception as exc: # pylint: disable=broad-except
raise self.handle_error(exc, traceback.format_exc())
else: raise
# No error ? We should have had StopIteration ... else:
raise RuntimeError('Context processors should not yield more than once.') # No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')
def handle_error(self, exc, trace): def handle_error(self, exc, trace):
""" return print_error(exc, trace, context=self.wrapped)
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)

View File

@ -25,15 +25,15 @@ class GraphExecutionContext:
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container() self.services = Container(services) if services else Container()
for i, component_context in enumerate(self): for i, node_context in enumerate(self):
try: try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] node_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError: except KeyError:
continue continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) node_context.input.on_begin = partial(node_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True) node_context.input.on_end = partial(node_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.stop) node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item): def __getitem__(self, item):
return self.nodes[item] return self.nodes[item]

View File

@ -7,7 +7,8 @@ from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag, ErrorBag from bonobo.structs.bags import Bag
from bonobo.util.errors import is_error
from bonobo.util.iterators import iter_if_not_sequence from bonobo.util.iterators import iter_if_not_sequence
@ -32,7 +33,13 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip()
def __repr__(self): def __repr__(self):
return '<' + self.__str__() + '>' stats = self.get_statistics_as_string().strip()
return '<{}({}{}){}>'.format(
type(self).__name__,
'+' if self.alive else '',
self.__name__,
(' ' + stats) if stats else '',
)
def recv(self, *messages): def recv(self, *messages):
""" """
@ -116,10 +123,6 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
self.push(_resolve(input_bag, result)) self.push(_resolve(input_bag, result))
def is_error(bag):
return isinstance(bag, ErrorBag)
def _resolve(input_bag, output): def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output. # NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED: if output is NOT_MODIFIED:

View File

@ -22,5 +22,3 @@ class JupyterOutputPlugin(Plugin):
self.widget.value = [repr(node) for node in self.context.parent.nodes] self.widget.value = [repr(node) for node in self.context.parent.nodes]
finalize = run finalize = run

View File

@ -1,10 +1,12 @@
import time import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.strategies.base import Strategy from bonobo.strategies.base import Strategy
from bonobo.structs.bags import Bag from bonobo.structs.bags import Bag
from bonobo.util.errors import print_error
class ExecutorStrategy(Strategy): class ExecutorStrategy(Strategy):
@ -29,18 +31,32 @@ class ExecutorStrategy(Strategy):
for plugin_context in context.plugins: for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context): def _runner(plugin_context=plugin_context):
plugin_context.start() try:
plugin_context.loop() plugin_context.start()
plugin_context.stop() plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Error in plugin context', context=plugin_context)
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))
for node_context in context.nodes: for node_context in context.nodes:
def _runner(node_context=node_context): def _runner(node_context=node_context):
node_context.start() try:
node_context.loop() node_context.start()
node_context.stop() except Exception as exc:
print_error(
exc, traceback.format_exc(), prefix='Could not start node context', context=node_context
)
node_context.input.on_end()
else:
node_context.loop()
try:
node_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Could not stop node context', context=node_context)
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))

31
bonobo/util/errors.py Normal file
View File

@ -0,0 +1,31 @@
import sys
from bonobo.structs.bags import ErrorBag
def is_error(bag):
return isinstance(bag, ErrorBag)
def print_error(exc, trace, context=None, prefix=''):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {}{}{}'.format(
(prefix + ': ') if prefix else '', type(exc).__name__, ' in {!r}'.format(context) if context else ''
),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)

View File

@ -45,39 +45,37 @@ setup(
description='Bonobo', description='Bonobo',
license='Apache License, Version 2.0', license='Apache License, Version 2.0',
install_requires=[ install_requires=[
'colorama >=0.3,<1.0', 'fs >=2.0,<3.0', 'psutil >=5.2,<6.0', 'colorama >=0.3,<1.0', 'fs >=2.0,<3.0', 'psutil >=5.2,<6.0', 'requests >=2.0,<3.0', 'stevedore >=1.21,<2.0'
'requests >=2.0,<3.0', 'stevedore >=1.21,<2.0'
], ],
version=version, version=version,
long_description=read('README.rst'), long_description=read('README.rst'),
classifiers=read('classifiers.txt', tolines), classifiers=read('classifiers.txt', tolines),
packages=find_packages(exclude=['ez_setup', 'example', 'test']), packages=find_packages(exclude=['ez_setup', 'example', 'test']),
include_package_data=True, include_package_data=True,
data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [ data_files=[
'bonobo/ext/jupyter/static/extension.js', (
'bonobo/ext/jupyter/static/index.js', 'share/jupyter/nbextensions/bonobo-jupyter', [
'bonobo/ext/jupyter/static/index.js.map' 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js',
])], 'bonobo/ext/jupyter/static/index.js.map'
]
)
],
extras_require={ extras_require={
'dev': [ 'dev': [
'coverage >=4,<5', 'pylint >=1,<2', 'pytest >=3,<4', 'coverage >=4,<5', 'pylint >=1,<2', 'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'sphinx',
'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', 'sphinx',
'sphinx_rtd_theme', 'yapf' 'sphinx_rtd_theme', 'yapf'
], ],
'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5']
}, },
entry_points={ entry_points={
'bonobo.commands': [ 'bonobo.commands': [
'init = bonobo.commands.init:register', 'init = bonobo.commands.init:register', 'run = bonobo.commands.run:register',
'run = bonobo.commands.run:register',
'version = bonobo.commands.version:register' 'version = bonobo.commands.version:register'
], ],
'console_scripts': ['bonobo = bonobo.commands:entrypoint'], 'console_scripts': ['bonobo = bonobo.commands:entrypoint'],
'edgy.project.features': 'edgy.project.features': ['bonobo = '
['bonobo = ' 'bonobo.ext.edgy.project.feature:BonoboFeature']
'bonobo.ext.edgy.project.feature:BonoboFeature']
}, },
url='https://www.bonobo-project.org/', url='https://www.bonobo-project.org/',
download_url= download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version),
'https://github.com/python-bonobo/bonobo/tarball/{version}'.format( )
version=version), )