Trying to fix unending transformations on start() error.

This commit is contained in:
Romain Dorgueil
2017-05-01 15:12:48 +02:00
parent bd0b9a3098
commit 474999a87e
6 changed files with 90 additions and 64 deletions

1
.gitignore vendored
View File

@ -25,6 +25,7 @@
/.idea /.idea
/.release /.release
/bonobo.iml /bonobo.iml
/bonobo/examples/work_in_progress/
/bonobo/ext/jupyter/js/node_modules/ /bonobo/ext/jupyter/js/node_modules/
/build/ /build/
/coverage.xml /coverage.xml

View File

@ -1,9 +1,9 @@
import sys
import traceback import traceback
from time import sleep from time import sleep
from bonobo.config import Container from bonobo.config import Container
from bonobo.config.processors import resolve_processors from bonobo.config.processors import resolve_processors
from bonobo.util.errors import print_error
from bonobo.util.iterators import ensure_tuple from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper from bonobo.util.objects import Wrapper
@ -43,16 +43,13 @@ class LoopingExecutionContext(Wrapper):
False), ('{}.start() can only be called on a new node.').format(type(self).__name__) False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None assert self._context is None
self._started = True self._started = True
try:
if self.parent: if self.parent:
self._context = self.parent.services.args_for(self.wrapped) self._context = self.parent.services.args_for(self.wrapped)
elif self.services: elif self.services:
self._context = self.services.args_for(self.wrapped) self._context = self.services.args_for(self.wrapped)
else: else:
self._context = () self._context = ()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
for processor in resolve_processors(self.wrapped): for processor in resolve_processors(self.wrapped):
try: try:
@ -80,41 +77,22 @@ class LoopingExecutionContext(Wrapper):
if self._stopped: if self._stopped:
return return
assert self._context is not None
self._stopped = True self._stopped = True
while len(self._stack): if self._context is not None:
processor = self._stack.pop() while len(self._stack):
try: processor = self._stack.pop()
# todo yield from ? how to ? try:
next(processor) # todo yield from ? how to ?
except StopIteration as exc: next(processor)
# This is normal, and wanted. except StopIteration as exc:
pass # This is normal, and wanted.
except Exception as exc: # pylint: disable=broad-except pass
self.handle_error(exc, traceback.format_exc()) except Exception as exc: # pylint: disable=broad-except
raise self.handle_error(exc, traceback.format_exc())
else: raise
# No error ? We should have had StopIteration ... else:
raise RuntimeError('Context processors should not yield more than once.') # No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')
def handle_error(self, exc, trace): def handle_error(self, exc, trace):
""" return print_error(exc, trace, context=self.wrapped)
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)

View File

@ -25,15 +25,15 @@ class GraphExecutionContext:
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container() self.services = Container(services) if services else Container()
for i, component_context in enumerate(self): for i, node_context in enumerate(self):
try: try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] node_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError: except KeyError:
continue continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) node_context.input.on_begin = partial(node_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True) node_context.input.on_end = partial(node_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.stop) node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item): def __getitem__(self, item):
return self.nodes[item] return self.nodes[item]

View File

@ -7,7 +7,8 @@ from bonobo.core.inputs import Input
from bonobo.core.statistics import WithStatistics from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag, ErrorBag from bonobo.structs.bags import Bag
from bonobo.util.errors import is_error
from bonobo.util.iterators import iter_if_not_sequence from bonobo.util.iterators import iter_if_not_sequence
@ -32,7 +33,13 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip()
def __repr__(self): def __repr__(self):
return '<' + self.__str__() + '>' stats = self.get_statistics_as_string().strip()
return '<{}({}{}){}>'.format(
type(self).__name__,
'+' if self.alive else '',
self.__name__,
(' ' + stats) if stats else '',
)
def recv(self, *messages): def recv(self, *messages):
""" """
@ -116,10 +123,6 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
self.push(_resolve(input_bag, result)) self.push(_resolve(input_bag, result))
def is_error(bag):
return isinstance(bag, ErrorBag)
def _resolve(input_bag, output): def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output. # NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED: if output is NOT_MODIFIED:

View File

@ -1,10 +1,12 @@
import time import time
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.strategies.base import Strategy from bonobo.strategies.base import Strategy
from bonobo.structs.bags import Bag from bonobo.structs.bags import Bag
from bonobo.util.errors import print_error
class ExecutorStrategy(Strategy): class ExecutorStrategy(Strategy):
@ -27,20 +29,32 @@ class ExecutorStrategy(Strategy):
futures = [] futures = []
for plugin_context in context.plugins: for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context): def _runner(plugin_context=plugin_context):
plugin_context.start() try:
plugin_context.loop() plugin_context.start()
plugin_context.stop() plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Error in plugin context', context=plugin_context)
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))
for node_context in context.nodes: for node_context in context.nodes:
def _runner(node_context=node_context): def _runner(node_context=node_context):
node_context.start() try:
node_context.loop() node_context.start()
node_context.stop() except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Could not start node context',
context=node_context)
node_context.input.on_end()
else:
node_context.loop()
try:
node_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Could not stop node context', context=node_context)
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))

30
bonobo/util/errors.py Normal file
View File

@ -0,0 +1,30 @@
import sys
from bonobo.structs.bags import ErrorBag
def is_error(bag):
return isinstance(bag, ErrorBag)
def print_error(exc, trace, context=None, prefix=''):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""
from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {}{}{}'.format((prefix + ': ') if prefix else '', type(exc).__name__,
' in {!r}'.format(context) if context else ''),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)