From 4c2287ebf0cc5a232c33a35815165a0e9e68ca3a Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Wed, 11 Jul 2018 15:46:23 +0200 Subject: [PATCH 01/13] Partially configured object: shows what is missing instead of cryptic error only --- bonobo/config/configurables.py | 1 - bonobo/execution/contexts/base.py | 10 ++++++---- bonobo/execution/contexts/node.py | 17 ++++++++++++----- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/bonobo/config/configurables.py b/bonobo/config/configurables.py index 50f41fc..eedd6ed 100644 --- a/bonobo/config/configurables.py +++ b/bonobo/config/configurables.py @@ -77,7 +77,6 @@ except: PartiallyConfigured = functools.partial else: - class PartiallyConfigured(_functools.partial): @property # TODO XXX cache this def _options_values(self): diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index 953f13c..4647d91 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -3,10 +3,9 @@ import sys from contextlib import contextmanager from logging import ERROR -from mondrian import term - from bonobo.util import deprecated from bonobo.util.objects import Wrapper, get_name +from mondrian import term @contextmanager @@ -23,7 +22,7 @@ def unrecoverable(error_handler): yield except Exception as exc: # pylint: disable=broad-except error_handler(*sys.exc_info(), level=ERROR) - raise # raise unrecoverableerror from x ? + raise # raise unrecoverableerror from exc ? class Lifecycle: @@ -60,7 +59,10 @@ class Lifecycle: @property def status(self): - """One character status for this node. """ + """ + One character status for this node. + + """ if self._defunct: return '!' if not self.started: diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 316d9b8..cf9ec67 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -77,11 +77,18 @@ class NodeExecutionContext(BaseContext, WithStatistics): initial = self._get_initial_context() self._stack = ContextCurrifier(self.wrapped, *initial.args, **initial.kwargs) if isconfigurabletype(self.wrapped): - # Not normal to have a partially configured object here, so let's warn the user instead of having get into - # the hard trouble of understanding that by himself. - raise TypeError( - 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped) - ) + try: + self.wrapped = self.wrapped(_final=True) + except Exception as exc: + # Not normal to have a partially configured object here, so let's warn the user instead of having get into + # the hard trouble of understanding that by himself. + raise TypeError( + 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped) + ) from exc + else: + raise TypeError( + 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped) + ) self._stack.setup(self) except Exception: # Set the logging level to the lowest possible, to avoid double log. From 8b3215ad254668e586a90cbb0f7e41906a74db0f Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Wed, 11 Jul 2018 17:09:53 +0200 Subject: [PATCH 02/13] wip: refactoring context to share base. --- bonobo/execution/contexts/base.py | 12 +++- bonobo/execution/contexts/graph.py | 39 +++++++++--- bonobo/execution/contexts/node.py | 11 +++- .../contexts/test_execution_contexts_graph.py | 59 +++++++++++++++++++ 4 files changed, 110 insertions(+), 11 deletions(-) create mode 100644 tests/execution/contexts/test_execution_contexts_graph.py diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index 4647d91..b7f07c5 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -99,9 +99,6 @@ class Lifecycle: self._stopped = True - if self._stopped: # Stopping twice has no effect - return - def kill(self): if not self.started: raise RuntimeError('Cannot kill an unstarted context.') @@ -136,3 +133,12 @@ class BaseContext(Lifecycle, Wrapper): Lifecycle.__init__(self) Wrapper.__init__(self, wrapped) self.parent = parent + + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + """ + if self._defunct: + return 70 + return 0 diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index a6559a3..3c029b5 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -4,12 +4,17 @@ from time import sleep from bonobo.config import create_container from bonobo.constants import BEGIN, END from bonobo.execution import events +from bonobo.execution.contexts.base import BaseContext from bonobo.execution.contexts.node import NodeExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext from whistle import EventDispatcher -class GraphExecutionContext: +class GraphExecutionContext(BaseContext): + """ + Stores the actual state of a graph execution, and manages its lifecycle. + + """ NodeExecutionContextType = NodeExecutionContext PluginExecutionContextType = PluginExecutionContext @@ -17,17 +22,24 @@ class GraphExecutionContext: @property def started(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).started return any(node.started for node in self.nodes) @property def stopped(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).stopped return all(node.started and node.stopped for node in self.nodes) @property def alive(self): + if not len(self.nodes): + return super(GraphExecutionContext, self).alive return any(node.alive for node in self.nodes) - def __init__(self, graph, plugins=None, services=None, dispatcher=None): + def __init__(self, graph, *, plugins=None, services=None, dispatcher=None): + super(GraphExecutionContext, self).__init__(graph) self.dispatcher = dispatcher or EventDispatcher() self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] @@ -74,6 +86,8 @@ class GraphExecutionContext: self.dispatcher.dispatch(name, events.ExecutionEvent(self)) def start(self, starter=None): + super(GraphExecutionContext, self).start() + self.register_plugins() self.dispatch(events.START) self.tick(pause=False) @@ -89,13 +103,16 @@ class GraphExecutionContext: if pause: sleep(self.TICK_PERIOD) - def kill(self): - self.dispatch(events.KILL) - for node_context in self.nodes: - node_context.kill() - self.tick() + def loop(self): + while self.should_loop: + self.tick() + for node in self.nodes: + if node.should_loop: + node.step() def stop(self, stopper=None): + super(GraphExecutionContext, self).stop() + self.dispatch(events.STOP) for node_context in self.nodes: if stopper is None: @@ -106,6 +123,14 @@ class GraphExecutionContext: self.dispatch(events.STOPPED) self.unregister_plugins() + def kill(self): + super(GraphExecutionContext, self).kill() + + self.dispatch(events.KILL) + for node_context in self.nodes: + node_context.kill() + self.tick() + def register_plugins(self): for plugin_context in self.plugins: plugin_context.register() diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index cf9ec67..07cbf16 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -21,6 +21,15 @@ UnboundArguments = namedtuple('UnboundArguments', ['args', 'kwargs']) class NodeExecutionContext(BaseContext, WithStatistics): + """ + Stores the actual context of a node, within a given graph execution, accessed as `self.parent`. + + A special case exist, mostly for testing purpose, where there is no parent context. + + Can be used as a context manager, also very convenient for testing nodes that requires some external context (like + a service implementation, or a value holder). + + """ def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): """ Node execution context has the responsibility fo storing the state of a transformation during its execution. @@ -170,7 +179,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): if self._stack: try: self._stack.teardown() - except: + except Exception as exc: self.fatal(sys.exc_info()) super().stop() diff --git a/tests/execution/contexts/test_execution_contexts_graph.py b/tests/execution/contexts/test_execution_contexts_graph.py new file mode 100644 index 0000000..95b1ea4 --- /dev/null +++ b/tests/execution/contexts/test_execution_contexts_graph.py @@ -0,0 +1,59 @@ +from bonobo import Graph +from bonobo.execution.contexts import GraphExecutionContext + + +def raise_an_error(*args, **kwargs): + raise Exception('Careful, man, there\'s a beverage here!') + + +def raise_an_unrecoverrable_error(*args, **kwargs): + raise Exception('You are entering a world of pain!') + + +def test_lifecycle_of_empty_graph(): + graph = Graph() + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_nonempty_graph(): + graph = Graph([1, 2, 3], print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_graph_with_recoverable_error(): + graph = Graph([1, 2, 3], raise_an_error, print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus + + +def test_lifecycle_of_graph_with_unrecoverable_error(): + graph = Graph([1, 2, 3], raise_an_unrecoverrable_error, print) + with GraphExecutionContext(graph) as context: + assert context.started + assert context.alive + assert not context.stopped + context.loop() + assert context.started + assert not context.alive + assert context.stopped + assert not context.xstatus From d9f2fd3009e5f9b734aacdae3ee82d6b5e4c1467 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Thu, 12 Jul 2018 16:25:26 +0200 Subject: [PATCH 03/13] config: Change remove() to discard() to allow overriding value using simple attributes. --- bonobo/config/configurables.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bonobo/config/configurables.py b/bonobo/config/configurables.py index eedd6ed..20ceca5 100644 --- a/bonobo/config/configurables.py +++ b/bonobo/config/configurables.py @@ -140,15 +140,14 @@ class Configurable(metaclass=ConfigurableMeta): break # option orders make all positional options first, job done. if not isoption(getattr(cls, name)): - missing.remove(name) + missing.discard(name) continue if len(args) <= position: break # no more positional arguments given. position += 1 - if name in missing: - missing.remove(name) + missing.discard(name) # complain if there is more options than possible. extraneous = set(kwargs.keys()) - (set(next(zip(*options))) if len(options) else set()) From 6a1203602f7f7fe8ff5f16b48a8bfbb7191ed5f5 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 10:00:28 +0200 Subject: [PATCH 04/13] Minor tweaks about code quality. --- bonobo/config/configurables.py | 2 +- bonobo/config/services.py | 2 -- bonobo/examples/datasets/__main__.py | 2 +- bonobo/execution/contexts/base.py | 5 ++--- bonobo/execution/strategies/executor.py | 6 +++--- bonobo/nodes/io/csv.py | 3 +-- bonobo/plugins/sentry.py | 1 - bonobo/settings.py | 2 +- bonobo/util/environ.py | 1 - 9 files changed, 9 insertions(+), 15 deletions(-) diff --git a/bonobo/config/configurables.py b/bonobo/config/configurables.py index 20ceca5..98e21e7 100644 --- a/bonobo/config/configurables.py +++ b/bonobo/config/configurables.py @@ -72,7 +72,7 @@ class ConfigurableMeta(type): try: import _functools -except: +except ImportError: import functools PartiallyConfigured = functools.partial diff --git a/bonobo/config/services.py b/bonobo/config/services.py index 282d88f..eae3b8d 100644 --- a/bonobo/config/services.py +++ b/bonobo/config/services.py @@ -1,5 +1,3 @@ -import inspect -import pprint import re import threading import types diff --git a/bonobo/examples/datasets/__main__.py b/bonobo/examples/datasets/__main__.py index 91f702d..a62ce33 100644 --- a/bonobo/examples/datasets/__main__.py +++ b/bonobo/examples/datasets/__main__.py @@ -51,7 +51,7 @@ if __name__ == '__main__': s3.head_object( Bucket='bonobo-examples', Key=s3_path ) - except: + except Exception: s3.upload_file( local_path, 'bonobo-examples', diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index b7f07c5..e071266 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -1,7 +1,6 @@ import logging import sys from contextlib import contextmanager -from logging import ERROR from bonobo.util import deprecated from bonobo.util.objects import Wrapper, get_name @@ -13,7 +12,7 @@ def recoverable(error_handler): try: yield except Exception as exc: # pylint: disable=broad-except - error_handler(*sys.exc_info(), level=ERROR) + error_handler(*sys.exc_info(), level=logging.ERROR) @contextmanager @@ -21,7 +20,7 @@ def unrecoverable(error_handler): try: yield except Exception as exc: # pylint: disable=broad-except - error_handler(*sys.exc_info(), level=ERROR) + error_handler(*sys.exc_info(), level=logging.ERROR) raise # raise unrecoverableerror from exc ? diff --git a/bonobo/execution/strategies/executor.py b/bonobo/execution/strategies/executor.py index 1e2d45f..e900ce1 100644 --- a/bonobo/execution/strategies/executor.py +++ b/bonobo/execution/strategies/executor.py @@ -29,7 +29,7 @@ class ExecutorStrategy(Strategy): with self.create_executor() as executor: try: context.start(self.get_starter(executor, futures)) - except: + except Exception: logger.critical('Exception caught while starting execution context.', exc_info=sys.exc_info()) while context.alive: @@ -53,14 +53,14 @@ class ExecutorStrategy(Strategy): try: with node: node.loop() - except: + except Exception: logging.getLogger(__name__).critical( 'Critical error in threadpool node starter.', exc_info=sys.exc_info() ) try: futures.append(executor.submit(_runner)) - except: + except Exception: logging.getLogger(__name__).critical('futures.append', exc_info=sys.exc_info()) return starter diff --git a/bonobo/nodes/io/csv.py b/bonobo/nodes/io/csv.py index d7d03fb..7900dca 100644 --- a/bonobo/nodes/io/csv.py +++ b/bonobo/nodes/io/csv.py @@ -1,12 +1,11 @@ import csv -from bonobo.config import Option, use_raw_input, use_context +from bonobo.config import Option, use_context from bonobo.config.options import Method, RenamedOption from bonobo.constants import NOT_MODIFIED from bonobo.nodes.io.base import FileHandler from bonobo.nodes.io.file import FileReader, FileWriter from bonobo.util import ensure_tuple -from bonobo.util.bags import BagType class CsvHandler(FileHandler): diff --git a/bonobo/plugins/sentry.py b/bonobo/plugins/sentry.py index 44799da..4704c0b 100644 --- a/bonobo/plugins/sentry.py +++ b/bonobo/plugins/sentry.py @@ -1,5 +1,4 @@ from bonobo.plugins import Plugin -from raven import Client class SentryPlugin(Plugin): diff --git a/bonobo/settings.py b/bonobo/settings.py index 799ba3d..d87a329 100644 --- a/bonobo/settings.py +++ b/bonobo/settings.py @@ -52,7 +52,7 @@ class Setting: def set(self, value): value = self.formatter(value) if self.formatter else value if self.validator and not self.validator(value): - raise ValidationError('Invalid value {!r} for setting {}.'.format(value, self.name)) + raise ValidationError(self, 'Invalid value {!r} for setting {!r}.'.format(value, self.name)) self.value = value def set_if_true(self, value): diff --git a/bonobo/util/environ.py b/bonobo/util/environ.py index b344d29..980d1db 100644 --- a/bonobo/util/environ.py +++ b/bonobo/util/environ.py @@ -57,7 +57,6 @@ def get_argument_parser(parser=None): :return: """ if parser is None: - import argparse parser = argparse.ArgumentParser() # Store globally to be able to warn the user about the fact he's probably wrong not to pass a parser to From 7b365e014d4941a2133a6690912c7645a030419a Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 11:56:46 +0200 Subject: [PATCH 05/13] aggregates: Adds first version of Reduce() based on @levic work. --- bonobo/nodes/aggregation.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 bonobo/nodes/aggregation.py diff --git a/bonobo/nodes/aggregation.py b/bonobo/nodes/aggregation.py new file mode 100644 index 0000000..f09cc17 --- /dev/null +++ b/bonobo/nodes/aggregation.py @@ -0,0 +1,16 @@ +from bonobo.config import Configurable, Method, Option, ContextProcessor, use_raw_input +from bonobo.util import ValueHolder + + +class Reduce(Configurable): + function = Method() + initializer = Option(required=False) + + @ContextProcessor + def buffer(self, context): + values = yield ValueHolder(self.initializer() if callable(self.initializer) else self.initializer) + context.send(values.get()) + + @use_raw_input + def __call__(self, values, bag): + values.set(self.function(values.get(), bag)) From 71cd606fadfa6e07e556fc81203adee10005d1f8 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 12:12:15 +0200 Subject: [PATCH 06/13] experiment: try to autocast when possible --- bonobo/execution/contexts/node.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 07cbf16..a447261 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -286,11 +286,17 @@ class NodeExecutionContext(BaseContext, WithStatistics): if self._input_type is None: self._input_type = type(input_bag) elif type(input_bag) is not self._input_type: - raise UnrecoverableTypeError( - 'Input type changed between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( - self.wrapped, input_bag, self._input_type - ) - ) + try: + if type(self._input_type) == tuple: + input_bag = self._input_type(tuple) + else: + input_bag = self._input_type(*input_bag) + except Exception as exc: + raise UnrecoverableTypeError( + 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( + self.wrapped, input_bag, self._input_type + ) + ) from exc # Store or check input length, which is a soft fallback in case we're just using tuples if self._input_length is None: From 5780b3648007caf3567d510018286cc8f7d959b8 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Mon, 16 Jul 2018 12:53:15 +0200 Subject: [PATCH 07/13] experiment: try to autocast when possible --- bonobo/execution/contexts/node.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index a447261..e187b89 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -285,10 +285,10 @@ class NodeExecutionContext(BaseContext, WithStatistics): # Store or check input type if self._input_type is None: self._input_type = type(input_bag) - elif type(input_bag) is not self._input_type: + elif type(input_bag) != self._input_type: try: - if type(self._input_type) == tuple: - input_bag = self._input_type(tuple) + if self._input_type == tuple: + input_bag = self._input_type(input_bag) else: input_bag = self._input_type(*input_bag) except Exception as exc: From 66451d03bb8effe95582c68e306839ca2eeefa47 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 07:34:11 +0200 Subject: [PATCH 08/13] work in progress: working on nodes lifecycle. --- Makefile | 2 +- bonobo/execution/contexts/base.py | 3 +-- bonobo/execution/contexts/graph.py | 18 ++++++++++---- bonobo/execution/contexts/node.py | 24 ++++++++++--------- requirements-dev.txt | 18 +++++++------- requirements-docker.txt | 17 +++++++------ requirements-jupyter.txt | 13 +++++----- requirements-sqlalchemy.txt | 19 +++++++-------- requirements.txt | 17 +++++++------ setup.py | 2 +- .../contexts/test_execution_contexts_graph.py | 6 ++--- ...ode.py => test_execution_contexts_node.py} | 0 12 files changed, 74 insertions(+), 65 deletions(-) rename tests/execution/contexts/{test_node.py => test_execution_contexts_node.py} (100%) diff --git a/Makefile b/Makefile index 29446bc..43c4934 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Generated by Medikit 0.6.3 on 2018-06-11. +# Generated by Medikit 0.6.3 on 2018-07-22. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. diff --git a/bonobo/execution/contexts/base.py b/bonobo/execution/contexts/base.py index e071266..25cca26 100644 --- a/bonobo/execution/contexts/base.py +++ b/bonobo/execution/contexts/base.py @@ -53,8 +53,7 @@ class Lifecycle: @property def should_loop(self): - # TODO XXX started/stopped? - return not any((self.defunct, self.killed)) + return self.alive and not any((self.defunct, self.killed)) @property def status(self): diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index 3c029b5..b2362db 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -1,14 +1,19 @@ +import logging from functools import partial +from queue import Empty from time import sleep from bonobo.config import create_container from bonobo.constants import BEGIN, END +from bonobo.errors import InactiveReadableError from bonobo.execution import events from bonobo.execution.contexts.base import BaseContext from bonobo.execution.contexts.node import NodeExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext from whistle import EventDispatcher +logger = logging.getLogger(__name__) + class GraphExecutionContext(BaseContext): """ @@ -104,11 +109,16 @@ class GraphExecutionContext(BaseContext): sleep(self.TICK_PERIOD) def loop(self): - while self.should_loop: - self.tick() - for node in self.nodes: - if node.should_loop: + nodes = set(node for node in self.nodes if node.should_loop) + while self.should_loop and len(nodes): + self.tick(pause=False) + for node in list(nodes): + try: node.step() + except Empty: + continue + except InactiveReadableError: + nodes.discard(node) def stop(self, stopper=None): super(GraphExecutionContext, self).stop() diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 07cbf16..0affd29 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -120,20 +120,22 @@ class NodeExecutionContext(BaseContext, WithStatistics): break except Empty: sleep(TICK_PERIOD) # XXX: How do we determine this constant? - continue - except ( - NotImplementedError, - UnrecoverableError, - ): - self.fatal(sys.exc_info()) # exit loop - except Exception: # pylint: disable=broad-except - self.error(sys.exc_info()) # does not exit loop - except BaseException: - self.fatal(sys.exc_info()) # exit loop logger.debug('Node loop ends for {!r}.'.format(self)) def step(self): + try: + self._step() + except InactiveReadableError: + raise + except (NotImplementedError, UnrecoverableError, ): + self.fatal(sys.exc_info()) # exit loop + except Exception: # pylint: disable=broad-except + self.error(sys.exc_info()) # does not exit loop + except BaseException: + self.fatal(sys.exc_info()) # exit loop + + def _step(self): """ A single step in the loop. @@ -280,7 +282,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): If Queue raises (like Timeout or Empty), stat won't be changed. """ - input_bag = self.input.get() + input_bag = self.input.get(timeout=0) # Store or check input type if self._input_type is None: diff --git a/requirements-dev.txt b/requirements-dev.txt index 704a481..be0cfb2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,6 @@ -e .[dev] -r requirements.txt -alabaster==0.7.10 +alabaster==0.7.11 arrow==0.12.1 atomicwrites==1.1.5 attrs==18.1.0 @@ -13,7 +13,7 @@ cookiecutter==1.5.1 coverage==4.5.1 docutils==0.14 future==0.16.0 -idna==2.6 +idna==2.7 imagesize==1.0.0 jinja2-time==0.2.0 jinja2==2.10 @@ -22,20 +22,20 @@ more-itertools==4.2.0 packaging==17.1 pluggy==0.6.0 poyo==0.4.1 -py==1.5.3 +py==1.5.4 pygments==2.2.0 pyparsing==2.2.0 pytest-cov==2.5.1 -pytest-timeout==1.2.1 -pytest==3.6.1 +pytest-timeout==1.3.0 +pytest==3.6.3 python-dateutil==2.7.3 -pytz==2018.4 -requests==2.18.4 +pytz==2018.5 +requests==2.19.1 six==1.11.0 snowballstemmer==1.2.1 sphinx-sitemap==0.2 -sphinx==1.7.5 +sphinx==1.7.6 sphinxcontrib-websupport==1.1.0 -urllib3==1.22 +urllib3==1.23 whichcraft==0.4.1 yapf==0.22.0 diff --git a/requirements-docker.txt b/requirements-docker.txt index 08c0bf3..4600a77 100644 --- a/requirements-docker.txt +++ b/requirements-docker.txt @@ -7,24 +7,23 @@ chardet==3.0.4 colorama==0.3.9 docker-pycreds==0.3.0 docker==2.7.0 -fs==2.0.23 -graphviz==0.8.3 -idna==2.6 +fs==2.0.25 +graphviz==0.8.4 +idna==2.7 jinja2==2.10 markupsafe==1.0 mondrian==0.7.0 packaging==17.1 -pbr==4.0.4 +pbr==4.1.1 psutil==5.4.6 pyparsing==2.2.0 python-slugify==1.2.5 -pytz==2018.4 -requests==2.18.4 +pytz==2018.5 +requests==2.19.1 semantic-version==2.6.0 six==1.11.0 -stevedore==1.28.0 -typing==3.6.4 +stevedore==1.29.0 unidecode==1.0.22 -urllib3==1.22 +urllib3==1.23 websocket-client==0.48.0 whistle==1.0.1 diff --git a/requirements-jupyter.txt b/requirements-jupyter.txt index 38aef2a..7f6183f 100644 --- a/requirements-jupyter.txt +++ b/requirements-jupyter.txt @@ -10,7 +10,7 @@ ipykernel==4.8.2 ipython-genutils==0.2.0 ipython==6.4.0 ipywidgets==6.0.1 -jedi==0.12.0 +jedi==0.12.1 jinja2==2.10 jsonschema==2.6.0 jupyter-client==5.2.3 @@ -21,23 +21,24 @@ markupsafe==1.0 mistune==0.8.3 nbconvert==5.3.1 nbformat==4.4.0 -notebook==5.5.0 +notebook==5.6.0 pandocfilters==1.4.2 -parso==0.2.1 +parso==0.3.1 pexpect==4.6.0 pickleshare==0.7.4 +prometheus-client==0.3.0 prompt-toolkit==1.0.15 -ptyprocess==0.5.2 +ptyprocess==0.6.0 pygments==2.2.0 python-dateutil==2.7.3 -pyzmq==17.0.0 +pyzmq==17.1.0 qtconsole==4.3.1 send2trash==1.5.0 simplegeneric==0.8.1 six==1.11.0 terminado==0.8.1 testpath==0.3.1 -tornado==5.0.2 +tornado==5.1 traitlets==4.3.2 wcwidth==0.1.7 webencodings==0.5.1 diff --git a/requirements-sqlalchemy.txt b/requirements-sqlalchemy.txt index 8d49e86..f80fd17 100644 --- a/requirements-sqlalchemy.txt +++ b/requirements-sqlalchemy.txt @@ -5,23 +5,22 @@ bonobo-sqlalchemy==0.6.0 certifi==2018.4.16 chardet==3.0.4 colorama==0.3.9 -fs==2.0.23 -graphviz==0.8.3 -idna==2.6 +fs==2.0.25 +graphviz==0.8.4 +idna==2.7 jinja2==2.10 markupsafe==1.0 mondrian==0.7.0 packaging==17.1 -pbr==4.0.4 +pbr==4.1.1 psutil==5.4.6 pyparsing==2.2.0 python-slugify==1.2.5 -pytz==2018.4 -requests==2.18.4 +pytz==2018.5 +requests==2.19.1 six==1.11.0 -sqlalchemy==1.2.8 -stevedore==1.28.0 -typing==3.6.4 +sqlalchemy==1.2.10 +stevedore==1.29.0 unidecode==1.0.22 -urllib3==1.22 +urllib3==1.23 whistle==1.0.1 diff --git a/requirements.txt b/requirements.txt index d4696d1..faed2d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,22 +3,21 @@ appdirs==1.4.3 certifi==2018.4.16 chardet==3.0.4 colorama==0.3.9 -fs==2.0.23 -graphviz==0.8.3 -idna==2.6 +fs==2.0.25 +graphviz==0.8.4 +idna==2.7 jinja2==2.10 markupsafe==1.0 mondrian==0.7.0 packaging==17.1 -pbr==4.0.4 +pbr==4.1.1 psutil==5.4.6 pyparsing==2.2.0 python-slugify==1.2.5 -pytz==2018.4 -requests==2.18.4 +pytz==2018.5 +requests==2.19.1 six==1.11.0 -stevedore==1.28.0 -typing==3.6.4 +stevedore==1.29.0 unidecode==1.0.22 -urllib3==1.22 +urllib3==1.23 whistle==1.0.1 diff --git a/setup.py b/setup.py index f9b7806..9e0aa4b 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Generated by Medikit 0.6.3 on 2018-06-11. +# Generated by Medikit 0.6.3 on 2018-07-22. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. diff --git a/tests/execution/contexts/test_execution_contexts_graph.py b/tests/execution/contexts/test_execution_contexts_graph.py index 95b1ea4..e297c31 100644 --- a/tests/execution/contexts/test_execution_contexts_graph.py +++ b/tests/execution/contexts/test_execution_contexts_graph.py @@ -1,4 +1,5 @@ from bonobo import Graph +from bonobo.constants import EMPTY, BEGIN, END from bonobo.execution.contexts import GraphExecutionContext @@ -49,9 +50,8 @@ def test_lifecycle_of_graph_with_recoverable_error(): def test_lifecycle_of_graph_with_unrecoverable_error(): graph = Graph([1, 2, 3], raise_an_unrecoverrable_error, print) with GraphExecutionContext(graph) as context: - assert context.started - assert context.alive - assert not context.stopped + assert context.started and context.alive and not context.stopped + context.write(BEGIN, EMPTY, END) context.loop() assert context.started assert not context.alive diff --git a/tests/execution/contexts/test_node.py b/tests/execution/contexts/test_execution_contexts_node.py similarity index 100% rename from tests/execution/contexts/test_node.py rename to tests/execution/contexts/test_execution_contexts_node.py From 4e2cb29fc2a5137123a6f9e4be0d870f083aa068 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 07:54:22 +0200 Subject: [PATCH 09/13] Formatting. --- bonobo/config/configurables.py | 1 + bonobo/execution/contexts/node.py | 14 +++++++++++--- bonobo/registry.py | 2 ++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/bonobo/config/configurables.py b/bonobo/config/configurables.py index 98e21e7..81defaa 100644 --- a/bonobo/config/configurables.py +++ b/bonobo/config/configurables.py @@ -77,6 +77,7 @@ except ImportError: PartiallyConfigured = functools.partial else: + class PartiallyConfigured(_functools.partial): @property # TODO XXX cache this def _options_values(self): diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 0affd29..c8d7733 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -30,6 +30,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): a service implementation, or a value holder). """ + def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): """ Node execution context has the responsibility fo storing the state of a transformation during its execution. @@ -92,11 +93,15 @@ class NodeExecutionContext(BaseContext, WithStatistics): # Not normal to have a partially configured object here, so let's warn the user instead of having get into # the hard trouble of understanding that by himself. raise TypeError( - 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped) + 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format( + self.wrapped + ) ) from exc else: raise TypeError( - 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped) + 'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format( + self.wrapped + ) ) self._stack.setup(self) except Exception: @@ -128,7 +133,10 @@ class NodeExecutionContext(BaseContext, WithStatistics): self._step() except InactiveReadableError: raise - except (NotImplementedError, UnrecoverableError, ): + except ( + NotImplementedError, + UnrecoverableError, + ): self.fatal(sys.exc_info()) # exit loop except Exception: # pylint: disable=broad-except self.error(sys.exc_info()) # does not exit loop diff --git a/bonobo/registry.py b/bonobo/registry.py index fba0c53..f45da4f 100644 --- a/bonobo/registry.py +++ b/bonobo/registry.py @@ -89,6 +89,7 @@ class Registry: default_registry = Registry() + def create_reader(name, *args, format=None, registry=default_registry, **kwargs): """ Create a reader instance, guessing its factory using filename (and eventually format). @@ -103,6 +104,7 @@ def create_reader(name, *args, format=None, registry=default_registry, **kwargs) """ return registry.get_reader_factory_for(name, format=format)(name, *args, **kwargs) + def create_writer(name, *args, format=None, registry=default_registry, **kwargs): """ Create a writer instance, guessing its factory using filename (and eventually format). From e0d714cbba2559fe43269a0f36280d982e4f080b Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 07:58:30 +0200 Subject: [PATCH 10/13] bug: fix bad mistake in moving the "empty" catcher. --- bonobo/execution/contexts/node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index 784dd8f..4c5cff7 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -123,8 +123,6 @@ class NodeExecutionContext(BaseContext, WithStatistics): self.step() except InactiveReadableError: break - except Empty: - sleep(TICK_PERIOD) # XXX: How do we determine this constant? logger.debug('Node loop ends for {!r}.'.format(self)) @@ -133,6 +131,8 @@ class NodeExecutionContext(BaseContext, WithStatistics): self._step() except InactiveReadableError: raise + except Empty: + sleep(TICK_PERIOD) # XXX: How do we determine this constant? except ( NotImplementedError, UnrecoverableError, From c449f8601e11df2d6c78e72cf8191d433fc185fe Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 22 Jul 2018 11:20:08 +0200 Subject: [PATCH 11/13] implements xstatus in graph context, based on node xstatus. --- bonobo/execution/contexts/graph.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index b2362db..2828592 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -148,3 +148,11 @@ class GraphExecutionContext(BaseContext): def unregister_plugins(self): for plugin_context in self.plugins: plugin_context.unregister() + + + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + """ + return max(node.xstatus for node in self.nodes) From 99cb02364aedcc340ebbc16f60e3a75c215cbfae Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sat, 28 Jul 2018 12:33:37 +0100 Subject: [PATCH 12/13] types: fixing type casts. --- bonobo/execution/contexts/node.py | 11 +++++------ bonobo/util/collections.py | 20 +++++++++++++++++++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index c8d7733..13c1ff7 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -257,12 +257,11 @@ class NodeExecutionContext(BaseContext, WithStatistics): :param mixed value: message """ for message in messages: - if isinstance(message, Token): - self.input.put(message) - elif self._input_type: - self.input.put(ensure_tuple(message, cls=self._input_type)) - else: - self.input.put(ensure_tuple(message)) + if not isinstance(message, Token): + message = ensure_tuple(message, cls=self._input_type, length=self._input_length) + if self._input_length is None: + self._input_length = len(message) + self.input.put(message) def write_sync(self, *messages): self.write(BEGIN, *messages, END) diff --git a/bonobo/util/collections.py b/bonobo/util/collections.py index 3e46738..1f72e8c 100644 --- a/bonobo/util/collections.py +++ b/bonobo/util/collections.py @@ -7,7 +7,23 @@ class sortedlist(list): bisect.insort(self, x) -def ensure_tuple(tuple_or_mixed, *, cls=tuple): +def _with_length_check(f): + @functools.wraps(f) + def _wrapped(*args, length=None, **kwargs): + nonlocal f + result = f(*args, **kwargs) + if length is not None: + if length != len(result): + raise TypeError( + 'Length check failed, expected {} fields but got {}: {!r}.'.format(length, len(result), result) + ) + return result + + return _wrapped + + +@_with_length_check +def ensure_tuple(tuple_or_mixed, *, cls=None): """ If it's not a tuple, let's make a tuple of one item. Otherwise, not changed. @@ -16,6 +32,8 @@ def ensure_tuple(tuple_or_mixed, *, cls=tuple): :return: tuple """ + if cls is None: + cls = tuple if isinstance(tuple_or_mixed, cls): return tuple_or_mixed From 71fce07f99b50eaff1e43443a8f551165b3ec1f0 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sat, 28 Jul 2018 13:34:09 +0100 Subject: [PATCH 13/13] Fix xstatus when graph is empty. --- bonobo/execution/contexts/graph.py | 4 ++-- bonobo/execution/contexts/node.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index 2828592..ffc9dcc 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -149,10 +149,10 @@ class GraphExecutionContext(BaseContext): for plugin_context in self.plugins: plugin_context.unregister() - @property def xstatus(self): """ UNIX-like exit status, only coherent if the context has stopped. + """ - return max(node.xstatus for node in self.nodes) + return max(node.xstatus for node in self.nodes) if len(self.nodes) else 0 diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index b5884a7..44476f0 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -302,9 +302,8 @@ class NodeExecutionContext(BaseContext, WithStatistics): input_bag = self._input_type(*input_bag) except Exception as exc: raise UnrecoverableTypeError( - 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format( - self.wrapped, input_bag, self._input_type - ) + 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'. + format(self.wrapped, input_bag, self._input_type) ) from exc # Store or check input length, which is a soft fallback in case we're just using tuples