Merge branch 'develop' into feature/io_pickle

This commit is contained in:
Romain Dorgueil
2017-05-25 06:36:11 -07:00
committed by GitHub
35 changed files with 449 additions and 158 deletions

View File

@ -1,5 +1,3 @@
import warnings
from bonobo.structs import Bag, Graph, Token
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \
PrettyPrint, PickleWriter, PickleReader, Tee, count, identity, noop, pprint
@ -45,7 +43,6 @@ def run(graph, strategy=None, plugins=None, services=None):
plugins = plugins or []
from bonobo import settings
settings.check()
if not settings.QUIET: # pragma: no cover
@ -85,7 +82,9 @@ def open_fs(fs_url, *args, **kwargs):
:returns: :class:`~fs.base.FS` object
"""
from fs import open_fs as _open_fs
return _open_fs(str(fs_url), *args, **kwargs)
from os.path import expanduser
return _open_fs(expanduser(str(fs_url)), *args, **kwargs)
# bonobo.nodes

View File

@ -20,10 +20,8 @@ def get_default_services(filename, services=None):
'__name__': '__bonobo__',
'__file__': services_filename,
}
try:
exec(code, context)
except Exception:
raise
exec(code, context)
return {
**context[DEFAULT_SERVICES_ATTR](),
**(services or {}),

View File

@ -1,13 +1,15 @@
from bonobo.config.configurables import Configurable
from bonobo.config.options import Option, Method
from bonobo.config.processors import ContextProcessor
from bonobo.config.services import Container, Service
from bonobo.config.services import Container, Service, Exclusive
# bonobo.config public programming interface
__all__ = [
'Configurable',
'Container',
'ContextProcessor',
'Option',
'Exclusive',
'Method',
'Option',
'Service',
]

View File

@ -1,6 +1,6 @@
from bonobo.config.options import Method, Option
from bonobo.config.processors import ContextProcessor
from bonobo.errors import ConfigurationError
from bonobo.errors import ConfigurationError, AbstractError
__all__ = [
'Configurable',

View File

@ -1,4 +1,3 @@
import types
from collections import Iterable
from contextlib import contextmanager
@ -132,14 +131,7 @@ def resolve_processors(mixed):
try:
yield from mixed.__processors__
except AttributeError:
# old code, deprecated usage
if isinstance(mixed, types.FunctionType):
yield from getattr(mixed, _CONTEXT_PROCESSORS_ATTR, ())
for cls in reversed((mixed if isinstance(mixed, type) else type(mixed)).__mro__):
yield from cls.__dict__.get(_CONTEXT_PROCESSORS_ATTR, ())
return ()
yield from ()
get_context_processors = deprecated_alias('get_context_processors', resolve_processors)

View File

@ -1,7 +1,10 @@
import re
import threading
import types
from contextlib import ContextDecorator
from bonobo.config.options import Option
from bonobo.errors import MissingServiceImplementationError
_service_name_re = re.compile(r"^[^\d\W]\w*(:?\.[^\d\W]\w*)*$", re.UNICODE)
@ -78,8 +81,48 @@ class Container(dict):
if not name in self:
if default:
return default
raise KeyError('Cannot resolve service {!r} using provided service collection.'.format(name))
raise MissingServiceImplementationError(
'Cannot resolve service {!r} using provided service collection.'.format(name)
)
value = super().get(name)
# XXX this is not documented and can lead to errors.
if isinstance(value, types.LambdaType):
value = value(self)
return value
class Exclusive(ContextDecorator):
"""
Decorator and context manager used to require exclusive usage of an object, most probably a service. It's usefull
for example if call order matters on a service implementation (think of an http api that requires a nonce or version
parameter ...).
Usage:
>>> def handler(some_service):
... with Exclusive(some_service):
... some_service.call_1()
... some_service.call_2()
... some_service.call_3()
This will ensure that nobody else is using the same service while in the "with" block, using a lock primitive to
ensure that.
"""
_locks = {}
def __init__(self, wrapped):
self._wrapped = wrapped
def get_lock(self):
_id = id(self._wrapped)
if not _id in Exclusive._locks:
Exclusive._locks[_id] = threading.RLock()
return Exclusive._locks[_id]
def __enter__(self):
self.get_lock().acquire()
return self._wrapped
def __exit__(self, *exc):
self.get_lock().release()

View File

@ -55,4 +55,8 @@ class ProhibitedOperationError(RuntimeError):
class ConfigurationError(Exception):
pass
pass
class MissingServiceImplementationError(KeyError):
pass

View File

@ -0,0 +1,5 @@
from bonobo import get_examples_path, open_fs
def get_services():
return {'fs': open_fs(get_examples_path())}

View File

@ -0,0 +1,3 @@
from bonobo.util.python import require
graph = require('strings').graph

View File

@ -9,6 +9,14 @@ from bonobo.util.errors import print_error
from bonobo.util.objects import Wrapper, get_name
@contextmanager
def recoverable(error_handler):
try:
yield
except Exception as exc: # pylint: disable=broad-except
error_handler(exc, traceback.format_exc())
@contextmanager
def unrecoverable(error_handler):
try:
@ -55,10 +63,9 @@ class LoopingExecutionContext(Wrapper):
raise RuntimeError('Cannot start a node twice ({}).'.format(get_name(self)))
self._started = True
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
with unrecoverable(self.handle_error):
self._stack.setup(self)
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
self._stack.setup(self)
for enhancer in self._enhancers:
with unrecoverable(self.handle_error):
@ -82,7 +89,7 @@ class LoopingExecutionContext(Wrapper):
return
try:
with unrecoverable(self.handle_error):
if self._stack:
self._stack.teardown()
finally:
self._stopped = True

View File

@ -1,6 +1,4 @@
import traceback
from bonobo.execution.base import LoopingExecutionContext
from bonobo.execution.base import LoopingExecutionContext, recoverable
class PluginExecutionContext(LoopingExecutionContext):
@ -14,21 +12,14 @@ class PluginExecutionContext(LoopingExecutionContext):
def start(self):
super().start()
try:
with recoverable(self.handle_error):
self.wrapped.initialize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
def shutdown(self):
try:
with recoverable(self.handle_error):
self.wrapped.finalize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
finally:
self.alive = False
self.alive = False
def step(self):
try:
with recoverable(self.handle_error):
self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())

View File

@ -1,6 +1,7 @@
import functools
from pprint import pprint as _pprint
import itertools
from colorama import Fore, Style
from bonobo.config import Configurable, Option
@ -69,6 +70,12 @@ def _count_counter(self, context):
context.send(Bag(counter._value))
class PrettyPrinter(Configurable):
def call(self, *args, **kwargs):
for i, (item, value) in enumerate(itertools.chain(enumerate(args), kwargs.items())):
print(' ' if i else '', item, '=', str(value).strip().replace('\n', '\n' + CLEAR_EOL), CLEAR_EOL)
pprint = Tee(_pprint)

View File

@ -3,6 +3,8 @@ import csv
from bonobo.config import Option
from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.errors import ConfigurationError, ValidationError
from bonobo.structs import Bag
from bonobo.util.objects import ValueHolder
from .file import FileHandler, FileReader, FileWriter
@ -28,6 +30,14 @@ class CsvHandler(FileHandler):
headers = Option(tuple)
def validate_csv_output_format(v):
if callable(v):
return v
if v in {'dict', 'kwargs'}:
return v
raise ValidationError('Unsupported format {!r}.'.format(v))
class CsvReader(CsvHandler, FileReader):
"""
Reads a CSV and yield the values as dicts.
@ -39,13 +49,23 @@ class CsvReader(CsvHandler, FileReader):
"""
skip = Option(int, default=0)
output_format = Option(validate_csv_output_format, default='dict')
@ContextProcessor
def csv_headers(self, context, fs, file):
yield ValueHolder(self.headers)
def get_output_formater(self):
if callable(self.output_format):
return self.output_format
elif isinstance(self.output_format, str):
return getattr(self, '_format_as_' + self.output_format)
else:
raise ConfigurationError('Unsupported format {!r} for {}.'.format(self.output_format, type(self).__name__))
def read(self, fs, file, headers):
reader = csv.reader(file, delimiter=self.delimiter, quotechar=self.quotechar)
formater = self.get_output_formater()
if not headers.get():
headers.set(next(reader))
@ -60,7 +80,13 @@ class CsvReader(CsvHandler, FileReader):
if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (len(row), field_count, ))
yield dict(zip(headers.value, row))
yield formater(headers.get(), row)
def _format_as_dict(self, headers, values):
return dict(zip(headers, values))
def _format_as_kwargs(self, headers, values):
return Bag(**dict(zip(headers, values)))
class CsvWriter(CsvHandler, FileWriter):

View File

@ -4,11 +4,6 @@ from bonobo.config.processors import ContextProcessor
from bonobo.constants import NOT_MODIFIED
from bonobo.util.objects import ValueHolder
__all__ = [
'FileReader',
'FileWriter',
]
class FileHandler(Configurable):
"""Abstract component factory for file-related components.
@ -23,6 +18,7 @@ class FileHandler(Configurable):
path = Option(str, required=True, positional=True) # type: str
eol = Option(str, default='\n') # type: str
mode = Option(str) # type: str
encoding = Option(str, default='utf-8') # type: str
fs = Service('fs') # type: str
@ -32,7 +28,7 @@ class FileHandler(Configurable):
yield file
def open(self, fs):
return fs.open(self.path, self.mode)
return fs.open(self.path, self.mode, encoding=self.encoding)
class Reader(FileHandler):

View File

@ -3,10 +3,6 @@ import json
from bonobo.config.processors import ContextProcessor
from .file import FileWriter, FileReader
__all__ = [
'JsonWriter',
]
class JsonHandler():
eol = ',\n'

0
bonobo/nodes/io/xml.py Normal file
View File

View File

@ -36,7 +36,7 @@ class ExecutorStrategy(Strategy):
plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Error in plugin context', context=plugin_context)
print_error(exc, traceback.format_exc(), context=plugin_context)
futures.append(executor.submit(_runner))
@ -46,9 +46,7 @@ class ExecutorStrategy(Strategy):
try:
node_context.start()
except Exception as exc:
print_error(
exc, traceback.format_exc(), prefix='Could not start node context', context=node_context
)
print_error(exc, traceback.format_exc(), context=node_context, method='start')
node_context.input.on_end()
else:
node_context.loop()
@ -56,7 +54,7 @@ class ExecutorStrategy(Strategy):
try:
node_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Could not stop node context', context=node_context)
print_error(exc, traceback.format_exc(), context=node_context, method='stop')
futures.append(executor.submit(_runner))

View File

@ -75,6 +75,14 @@ class Bag:
raise TypeError('Could not apply bag to {}.'.format(func_or_iter))
def get(self):
"""
Get a 2 element tuple of this bag's args and kwargs.
:return: tuple
"""
return self.args, self.kwargs
def extend(self, *args, **kwargs):
return type(self)(*args, _parent=self, **kwargs)

View File

@ -1,5 +1,7 @@
import sys
from textwrap import indent
from bonobo import settings
from bonobo.structs.bags import ErrorBag
@ -7,7 +9,14 @@ def is_error(bag):
return isinstance(bag, ErrorBag)
def print_error(exc, trace, context=None, prefix=''):
def _get_error_message(exc):
if hasattr(exc, '__str__'):
message = str(exc)
return message[0].upper() + message[1:]
return '\n'.join(exc.args),
def print_error(exc, trace, context=None, method=None):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
@ -18,14 +27,20 @@ def print_error(exc, trace, context=None, prefix=''):
"""
from colorama import Fore, Style
prefix = '{}{} | {}'.format(Fore.RED, Style.BRIGHT, Style.RESET_ALL)
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {}{}{}'.format(
(prefix + ': ') if prefix else '', type(exc).__name__, ' in {!r}'.format(context) if context else ''
),
type(exc).__name__,
' (in {}{})'.format(type(context).__name__, '.{}()'.format(method) if method else '') if context else '',
Style.RESET_ALL,
'\n',
indent(_get_error_message(exc), prefix + Style.BRIGHT),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)
print(prefix, file=sys.stderr)
print(indent(trace, prefix, predicate=lambda line: True), file=sys.stderr)

View File

@ -1,3 +1,4 @@
from contextlib import contextmanager
from unittest.mock import MagicMock
from bonobo.execution.node import NodeExecutionContext
@ -7,3 +8,12 @@ class CapturingNodeExecutionContext(NodeExecutionContext):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.send = MagicMock()
@contextmanager
def optional_contextmanager(cm, *, ignore=False):
if cm is None or ignore:
yield
else:
with cm:
yield