Merge remote-tracking branch 'origin/better_errors' into better_errors

This commit is contained in:
Romain Dorgueil
2018-07-22 07:55:49 +02:00
21 changed files with 96 additions and 82 deletions

View File

@ -72,11 +72,12 @@ class ConfigurableMeta(type):
try:
import _functools
except:
except ImportError:
import functools
PartiallyConfigured = functools.partial
else:
class PartiallyConfigured(_functools.partial):
@property # TODO XXX cache this
def _options_values(self):

View File

@ -1,5 +1,3 @@
import inspect
import pprint
import re
import threading
import types

View File

@ -51,7 +51,7 @@ if __name__ == '__main__':
s3.head_object(
Bucket='bonobo-examples', Key=s3_path
)
except:
except Exception:
s3.upload_file(
local_path,
'bonobo-examples',

View File

@ -1,7 +1,6 @@
import logging
import sys
from contextlib import contextmanager
from logging import ERROR
from bonobo.util import deprecated
from bonobo.util.objects import Wrapper, get_name
@ -13,7 +12,7 @@ def recoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(*sys.exc_info(), level=ERROR)
error_handler(*sys.exc_info(), level=logging.ERROR)
@contextmanager
@ -21,7 +20,7 @@ def unrecoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(*sys.exc_info(), level=ERROR)
error_handler(*sys.exc_info(), level=logging.ERROR)
raise # raise unrecoverableerror from exc ?
@ -54,8 +53,7 @@ class Lifecycle:
@property
def should_loop(self):
# TODO XXX started/stopped?
return not any((self.defunct, self.killed))
return self.alive and not any((self.defunct, self.killed))
@property
def status(self):

View File

@ -1,14 +1,19 @@
import logging
from functools import partial
from queue import Empty
from time import sleep
from bonobo.config import create_container
from bonobo.constants import BEGIN, END
from bonobo.errors import InactiveReadableError
from bonobo.execution import events
from bonobo.execution.contexts.base import BaseContext
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.execution.contexts.plugin import PluginExecutionContext
from whistle import EventDispatcher
logger = logging.getLogger(__name__)
class GraphExecutionContext(BaseContext):
"""
@ -104,11 +109,16 @@ class GraphExecutionContext(BaseContext):
sleep(self.TICK_PERIOD)
def loop(self):
while self.should_loop:
self.tick()
for node in self.nodes:
if node.should_loop:
nodes = set(node for node in self.nodes if node.should_loop)
while self.should_loop and len(nodes):
self.tick(pause=False)
for node in list(nodes):
try:
node.step()
except Empty:
continue
except InactiveReadableError:
nodes.discard(node)
def stop(self, stopper=None):
super(GraphExecutionContext, self).stop()

View File

@ -30,6 +30,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
a service implementation, or a value holder).
"""
def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None):
"""
Node execution context has the responsibility fo storing the state of a transformation during its execution.
@ -92,11 +93,15 @@ class NodeExecutionContext(BaseContext, WithStatistics):
# Not normal to have a partially configured object here, so let's warn the user instead of having get into
# the hard trouble of understanding that by himself.
raise TypeError(
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped)
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(
self.wrapped
)
) from exc
else:
raise TypeError(
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped)
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(
self.wrapped
)
)
self._stack.setup(self)
except Exception:
@ -120,20 +125,25 @@ class NodeExecutionContext(BaseContext, WithStatistics):
break
except Empty:
sleep(TICK_PERIOD) # XXX: How do we determine this constant?
continue
except (
NotImplementedError,
UnrecoverableError,
):
self.fatal(sys.exc_info()) # exit loop
except Exception: # pylint: disable=broad-except
self.error(sys.exc_info()) # does not exit loop
except BaseException:
self.fatal(sys.exc_info()) # exit loop
logger.debug('Node loop ends for {!r}.'.format(self))
def step(self):
try:
self._step()
except InactiveReadableError:
raise
except (
NotImplementedError,
UnrecoverableError,
):
self.fatal(sys.exc_info()) # exit loop
except Exception: # pylint: disable=broad-except
self.error(sys.exc_info()) # does not exit loop
except BaseException:
self.fatal(sys.exc_info()) # exit loop
def _step(self):
"""
A single step in the loop.
@ -280,7 +290,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
If Queue raises (like Timeout or Empty), stat won't be changed.
"""
input_bag = self.input.get()
input_bag = self.input.get(timeout=0)
# Store or check input type
if self._input_type is None:

View File

@ -29,7 +29,7 @@ class ExecutorStrategy(Strategy):
with self.create_executor() as executor:
try:
context.start(self.get_starter(executor, futures))
except:
except Exception:
logger.critical('Exception caught while starting execution context.', exc_info=sys.exc_info())
while context.alive:
@ -53,14 +53,14 @@ class ExecutorStrategy(Strategy):
try:
with node:
node.loop()
except:
except Exception:
logging.getLogger(__name__).critical(
'Critical error in threadpool node starter.', exc_info=sys.exc_info()
)
try:
futures.append(executor.submit(_runner))
except:
except Exception:
logging.getLogger(__name__).critical('futures.append', exc_info=sys.exc_info())
return starter

View File

@ -1,12 +1,11 @@
import csv
from bonobo.config import Option, use_raw_input, use_context
from bonobo.config import Option, use_context
from bonobo.config.options import Method, RenamedOption
from bonobo.constants import NOT_MODIFIED
from bonobo.nodes.io.base import FileHandler
from bonobo.nodes.io.file import FileReader, FileWriter
from bonobo.util import ensure_tuple
from bonobo.util.bags import BagType
class CsvHandler(FileHandler):

View File

@ -1,5 +1,4 @@
from bonobo.plugins import Plugin
from raven import Client
class SentryPlugin(Plugin):

View File

@ -89,6 +89,7 @@ class Registry:
default_registry = Registry()
def create_reader(name, *args, format=None, registry=default_registry, **kwargs):
"""
Create a reader instance, guessing its factory using filename (and eventually format).
@ -103,6 +104,7 @@ def create_reader(name, *args, format=None, registry=default_registry, **kwargs)
"""
return registry.get_reader_factory_for(name, format=format)(name, *args, **kwargs)
def create_writer(name, *args, format=None, registry=default_registry, **kwargs):
"""
Create a writer instance, guessing its factory using filename (and eventually format).

View File

@ -52,7 +52,7 @@ class Setting:
def set(self, value):
value = self.formatter(value) if self.formatter else value
if self.validator and not self.validator(value):
raise ValidationError('Invalid value {!r} for setting {}.'.format(value, self.name))
raise ValidationError(self, 'Invalid value {!r} for setting {!r}.'.format(value, self.name))
self.value = value
def set_if_true(self, value):

View File

@ -57,7 +57,6 @@ def get_argument_parser(parser=None):
:return:
"""
if parser is None:
import argparse
parser = argparse.ArgumentParser()
# Store globally to be able to warn the user about the fact he's probably wrong not to pass a parser to