From ad502d7e238e4637848e79423ee0757f0575eb67 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Tue, 25 Apr 2017 23:05:18 +0200 Subject: [PATCH] Split execution module in submodules for better readability. --- Makefile | 2 +- Projectfile | 2 +- bonobo/execution.py | 340 ----------------------------------- bonobo/execution/__init__.py | 9 + bonobo/execution/base.py | 105 +++++++++++ bonobo/execution/graph.py | 68 +++++++ bonobo/execution/node.py | 133 ++++++++++++++ bonobo/execution/plugin.py | 34 ++++ bonobo/strategies/base.py | 2 +- bonobo/util/iterators.py | 8 +- bonobo/util/testing.py | 2 +- setup.py | 36 ++-- tests/io/test_csv.py | 2 +- tests/io/test_file.py | 2 +- tests/io/test_json.py | 2 +- tests/test_execution.py | 2 +- 16 files changed, 384 insertions(+), 365 deletions(-) delete mode 100644 bonobo/execution.py create mode 100644 bonobo/execution/__init__.py create mode 100644 bonobo/execution/base.py create mode 100644 bonobo/execution/graph.py create mode 100644 bonobo/execution/node.py create mode 100644 bonobo/execution/plugin.py diff --git a/Makefile b/Makefile index 76ff025..b8435c4 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # This file has been auto-generated. # All changes will be lost, see Projectfile. # -# Updated at 2017-04-24 23:47:46.325867 +# Updated at 2017-04-25 23:05:05.062813 PYTHON ?= $(shell which python) PYTHON_BASENAME ?= $(shell basename $(PYTHON)) diff --git a/Projectfile b/Projectfile index 73e5ba1..c02391f 100644 --- a/Projectfile +++ b/Projectfile @@ -23,7 +23,7 @@ enable_features = { install_requires = [ 'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', - 'requests >=2.12,<2.13', + 'requests >=2.13,<2.14', 'stevedore >=1.19,<1.20', ] diff --git a/bonobo/execution.py b/bonobo/execution.py deleted file mode 100644 index c23fe8c..0000000 --- a/bonobo/execution.py +++ /dev/null @@ -1,340 +0,0 @@ -import sys -import traceback -from functools import partial -from queue import Empty -from time import sleep - -from bonobo.config import Container -from bonobo.config.processors import resolve_processors -from bonobo.constants import BEGIN, END, INHERIT_INPUT, NOT_MODIFIED -from bonobo.core.inputs import Input -from bonobo.core.statistics import WithStatistics -from bonobo.errors import InactiveReadableError -from bonobo.structs.bags import Bag, ErrorBag -from bonobo.util.iterators import ensure_tuple -from bonobo.util.objects import Wrapper - -__all__ = [ - 'GraphExecutionContext', - 'NodeExecutionContext', - 'PluginExecutionContext', -] - - -class GraphExecutionContext: - @property - def started(self): - return any(node.started for node in self.nodes) - - @property - def stopped(self): - return all(node.started and node.stopped for node in self.nodes) - - @property - def alive(self): - return any(node.alive for node in self.nodes) - - def __init__(self, graph, plugins=None, services=None): - self.graph = graph - self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] - self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] - self.services = Container(services) if services else Container() - - for i, component_context in enumerate(self): - try: - component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] - except KeyError: - continue - - component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) - component_context.input.on_end = partial(component_context.send, END, _control=True) - component_context.input.on_finalize = partial(component_context.stop) - - def __getitem__(self, item): - return self.nodes[item] - - def __len__(self): - return len(self.nodes) - - def __iter__(self): - yield from self.nodes - - def recv(self, *messages): - """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in - our graph.""" - - for i in self.graph.outputs_of(BEGIN): - for message in messages: - self[i].recv(message) - - def start(self): - # todo use strategy - for node in self.nodes: - node.start() - - def loop(self): - # todo use strategy - for node in self.nodes: - node.loop() - - def stop(self): - # todo use strategy - for node in self.nodes: - node.stop() - - -class LoopingExecutionContext(Wrapper): - alive = True - PERIOD = 0.25 - - @property - def state(self): - return self._started, self._stopped - - @property - def started(self): - return self._started - - @property - def stopped(self): - return self._stopped - - def __init__(self, wrapped, parent): - super().__init__(wrapped) - self.parent = parent - self._started, self._stopped, self._context, self._stack = False, False, None, [] - - def start(self): - assert self.state == (False, - False), ('{}.start() can only be called on a new node.').format(type(self).__name__) - assert self._context is None - self._started = True - try: - self._context = self.parent.services.args_for(self.wrapped) if self.parent else () - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - - for processor in resolve_processors(self.wrapped): - try: - _processed = processor(self.wrapped, self, *self._context) - _append_to_context = next(_processed) - if _append_to_context is not None: - self._context += ensure_tuple(_append_to_context) - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - self._stack.append(_processed) - - def loop(self): - """Generic loop. A bit boring. """ - while self.alive: - self.step() - sleep(self.PERIOD) - - def step(self): - """Left as an exercise for the children.""" - raise NotImplementedError('Abstract.') - - def stop(self): - assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) - if self._stopped: - return - - assert self._context is not None - - self._stopped = True - while len(self._stack): - processor = self._stack.pop() - try: - # todo yield from ? how to ? - next(processor) - except StopIteration as exc: - # This is normal, and wanted. - pass - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - raise - else: - # No error ? We should have had StopIteration ... - raise RuntimeError('Context processors should not yield more than once.') - - def handle_error(self, exc, trace): - """ - Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception - or somehow make me think it is an exception, I'll handle it. - - :param exc: the culprit - :param trace: Hercule Poirot's logbook. - :return: to hell - """ - - from colorama import Fore, Style - print( - Style.BRIGHT, - Fore.RED, - '\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped), - Style.RESET_ALL, - sep='', - file=sys.stderr, - ) - print(trace) - - -class NodeExecutionContext(WithStatistics, LoopingExecutionContext): - """ - todo: make the counter dependant of parent context? - """ - - @property - def alive(self): - """todo check if this is right, and where it is used""" - return self.input.alive and self._started and not self._stopped - - def __init__(self, wrapped, parent): - LoopingExecutionContext.__init__(self, wrapped, parent) - WithStatistics.__init__(self, 'in', 'out', 'err') - - self.input = Input() - self.outputs = [] - - def __str__(self): - return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() - - def __repr__(self): - return '<' + self.__str__() + '>' - - def recv(self, *messages): - """ - Push a message list to this context's input queue. - - :param mixed value: message - """ - for message in messages: - self.input.put(message) - - def send(self, value, _control=False): - """ - Sends a message to all of this context's outputs. - - :param mixed value: message - :param _control: if true, won't count in statistics. - """ - if not _control: - self.increment('out') - for output in self.outputs: - output.put(value) - - def get(self): - """ - Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. - - """ - row = self.input.get(timeout=self.PERIOD) - self.increment('in') - return row - - def loop(self): - while True: - try: - self.step() - except KeyboardInterrupt: - raise - except InactiveReadableError: - break - except Empty: - sleep(self.PERIOD) - continue - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - def step(self): - # Pull data from the first available input channel. - """Runs a transformation callable with given args/kwargs and flush the result into the right - output channel.""" - - input_bag = self.get() - - # todo add timer - self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) - - def handle_results(self, input_bag, results): - # self._exec_time += timer.duration - # Put data onto output channels - try: - results = _iter(results) - except TypeError: # not an iterator - if results: - if isinstance(results, ErrorBag): - results.apply(self.handle_error) - else: - self.send(_resolve(input_bag, results)) - else: - # case with no result, an execution went through anyway, use for stats. - # self._exec_count += 1 - pass - else: - while True: # iterator - try: - output = next(results) - except StopIteration: - break - else: - if isinstance(output, ErrorBag): - output.apply(self.handle_error) - else: - self.send(_resolve(input_bag, output)) - - -class PluginExecutionContext(LoopingExecutionContext): - PERIOD = 0.5 - - def __init__(self, wrapped, parent): - # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure - # plugins, for example if it depends on an external service. - super().__init__(wrapped(self), parent) - - def start(self): - super().start() - - try: - self.wrapped.initialize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - def shutdown(self): - try: - self.wrapped.finalize() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - finally: - self.alive = False - - def step(self): - try: - self.wrapped.run() - except Exception as exc: # pylint: disable=broad-except - self.handle_error(exc, traceback.format_exc()) - - -def _iter(mixed): - if isinstance(mixed, (dict, list, str)): - raise TypeError(type(mixed).__name__) - return iter(mixed) - - -def _resolve(input_bag, output): - # NotModified means to send the input unmodified to output. - if output is NOT_MODIFIED: - return input_bag - - # If it does not look like a bag, let's create one for easier manipulation - if hasattr(output, 'apply'): - # Already a bag? Check if we need to set parent. - if INHERIT_INPUT in output.flags: - output.set_parent(input_bag) - else: - # Not a bag? Let's encapsulate it. - output = Bag(output) - - return output diff --git a/bonobo/execution/__init__.py b/bonobo/execution/__init__.py new file mode 100644 index 0000000..aaf4ba3 --- /dev/null +++ b/bonobo/execution/__init__.py @@ -0,0 +1,9 @@ +from bonobo.execution.graph import GraphExecutionContext, NodeExecutionContext, PluginExecutionContext + +__all__ = [ + 'GraphExecutionContext', + 'NodeExecutionContext', + 'PluginExecutionContext', +] + + diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py new file mode 100644 index 0000000..b84cb70 --- /dev/null +++ b/bonobo/execution/base.py @@ -0,0 +1,105 @@ +import sys +import traceback +from time import sleep + +from bonobo.config.processors import resolve_processors +from bonobo.util.iterators import ensure_tuple +from bonobo.util.objects import Wrapper + + +class LoopingExecutionContext(Wrapper): + alive = True + PERIOD = 0.25 + + @property + def state(self): + return self._started, self._stopped + + @property + def started(self): + return self._started + + @property + def stopped(self): + return self._stopped + + def __init__(self, wrapped, parent): + super().__init__(wrapped) + self.parent = parent + self._started, self._stopped, self._context, self._stack = False, False, None, [] + + def start(self): + assert self.state == (False, + False), ('{}.start() can only be called on a new node.').format(type(self).__name__) + assert self._context is None + self._started = True + try: + self._context = self.parent.services.args_for(self.wrapped) if self.parent else () + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + + for processor in resolve_processors(self.wrapped): + try: + _processed = processor(self.wrapped, self, *self._context) + _append_to_context = next(_processed) + if _append_to_context is not None: + self._context += ensure_tuple(_append_to_context) + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + self._stack.append(_processed) + + def loop(self): + """Generic loop. A bit boring. """ + while self.alive: + self.step() + sleep(self.PERIOD) + + def step(self): + """Left as an exercise for the children.""" + raise NotImplementedError('Abstract.') + + def stop(self): + assert self._started, ('{}.stop() can only be called on a previously started node.').format(type(self).__name__) + if self._stopped: + return + + assert self._context is not None + + self._stopped = True + while len(self._stack): + processor = self._stack.pop() + try: + # todo yield from ? how to ? + next(processor) + except StopIteration as exc: + # This is normal, and wanted. + pass + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + raise + else: + # No error ? We should have had StopIteration ... + raise RuntimeError('Context processors should not yield more than once.') + + def handle_error(self, exc, trace): + """ + Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception + or somehow make me think it is an exception, I'll handle it. + + :param exc: the culprit + :param trace: Hercule Poirot's logbook. + :return: to hell + """ + + from colorama import Fore, Style + print( + Style.BRIGHT, + Fore.RED, + '\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped), + Style.RESET_ALL, + sep='', + file=sys.stderr, + ) + print(trace) \ No newline at end of file diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py new file mode 100644 index 0000000..d5e241f --- /dev/null +++ b/bonobo/execution/graph.py @@ -0,0 +1,68 @@ +from functools import partial + +from bonobo.config.services import Container +from bonobo.constants import BEGIN, END +from bonobo.execution.node import NodeExecutionContext +from bonobo.execution.plugin import PluginExecutionContext + + +class GraphExecutionContext: + @property + def started(self): + return any(node.started for node in self.nodes) + + @property + def stopped(self): + return all(node.started and node.stopped for node in self.nodes) + + @property + def alive(self): + return any(node.alive for node in self.nodes) + + def __init__(self, graph, plugins=None, services=None): + self.graph = graph + self.nodes = [NodeExecutionContext(node, parent=self) for node in self.graph.nodes] + self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()] + self.services = Container(services) if services else Container() + + for i, component_context in enumerate(self): + try: + component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)] + except KeyError: + continue + + component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True) + component_context.input.on_end = partial(component_context.send, END, _control=True) + component_context.input.on_finalize = partial(component_context.stop) + + def __getitem__(self, item): + return self.nodes[item] + + def __len__(self): + return len(self.nodes) + + def __iter__(self): + yield from self.nodes + + def recv(self, *messages): + """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in + our graph.""" + + for i in self.graph.outputs_of(BEGIN): + for message in messages: + self[i].recv(message) + + def start(self): + # todo use strategy + for node in self.nodes: + node.start() + + def loop(self): + # todo use strategy + for node in self.nodes: + node.loop() + + def stop(self): + # todo use strategy + for node in self.nodes: + node.stop() diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py new file mode 100644 index 0000000..0657972 --- /dev/null +++ b/bonobo/execution/node.py @@ -0,0 +1,133 @@ +import traceback +from queue import Empty +from time import sleep + +from bonobo.structs.bags import Bag, ErrorBag +from bonobo.constants import INHERIT_INPUT, NOT_MODIFIED +from bonobo.core.inputs import Input +from bonobo.core.statistics import WithStatistics +from bonobo.errors import InactiveReadableError +from bonobo.execution.base import LoopingExecutionContext +from bonobo.util.iterators import iter_if_not_sequence + + +class NodeExecutionContext(WithStatistics, LoopingExecutionContext): + """ + todo: make the counter dependant of parent context? + """ + + @property + def alive(self): + """todo check if this is right, and where it is used""" + return self.input.alive and self._started and not self._stopped + + def __init__(self, wrapped, parent): + LoopingExecutionContext.__init__(self, wrapped, parent) + WithStatistics.__init__(self, 'in', 'out', 'err') + + self.input = Input() + self.outputs = [] + + def __str__(self): + return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip() + + def __repr__(self): + return '<' + self.__str__() + '>' + + def recv(self, *messages): + """ + Push a message list to this context's input queue. + + :param mixed value: message + """ + for message in messages: + self.input.put(message) + + def send(self, value, _control=False): + """ + Sends a message to all of this context's outputs. + + :param mixed value: message + :param _control: if true, won't count in statistics. + """ + if not _control: + self.increment('out') + for output in self.outputs: + output.put(value) + + def get(self): + """ + Get from the queue first, then increment stats, so if Queue raise Timeout or Empty, stat won't be changed. + + """ + row = self.input.get(timeout=self.PERIOD) + self.increment('in') + return row + + def loop(self): + while True: + try: + self.step() + except KeyboardInterrupt: + raise + except InactiveReadableError: + break + except Empty: + sleep(self.PERIOD) + continue + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def step(self): + # Pull data from the first available input channel. + """Runs a transformation callable with given args/kwargs and flush the result into the right + output channel.""" + + input_bag = self.get() + + # todo add timer + self.handle_results(input_bag, input_bag.apply(self.wrapped, *self._context)) + + def handle_results(self, input_bag, results): + # self._exec_time += timer.duration + # Put data onto output channels + try: + results = iter_if_not_sequence(results) + except TypeError: # not an iterator + if results: + if isinstance(results, ErrorBag): + results.apply(self.handle_error) + else: + self.send(_resolve(input_bag, results)) + else: + # case with no result, an execution went through anyway, use for stats. + # self._exec_count += 1 + pass + else: + while True: # iterator + try: + output = next(results) + except StopIteration: + break + else: + if isinstance(output, ErrorBag): + output.apply(self.handle_error) + else: + self.send(_resolve(input_bag, output)) + + +def _resolve(input_bag, output): + # NotModified means to send the input unmodified to output. + if output is NOT_MODIFIED: + return input_bag + + # If it does not look like a bag, let's create one for easier manipulation + if hasattr(output, 'apply'): + # Already a bag? Check if we need to set parent. + if INHERIT_INPUT in output.flags: + output.set_parent(input_bag) + else: + # Not a bag? Let's encapsulate it. + output = Bag(output) + + return output diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py new file mode 100644 index 0000000..ec1c5bd --- /dev/null +++ b/bonobo/execution/plugin.py @@ -0,0 +1,34 @@ +import traceback + +from bonobo.execution.base import LoopingExecutionContext + + +class PluginExecutionContext(LoopingExecutionContext): + PERIOD = 0.5 + + def __init__(self, wrapped, parent): + # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure + # plugins, for example if it depends on an external service. + super().__init__(wrapped(self), parent) + + def start(self): + super().start() + + try: + self.wrapped.initialize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + + def shutdown(self): + try: + self.wrapped.finalize() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) + finally: + self.alive = False + + def step(self): + try: + self.wrapped.run() + except Exception as exc: # pylint: disable=broad-except + self.handle_error(exc, traceback.format_exc()) \ No newline at end of file diff --git a/bonobo/strategies/base.py b/bonobo/strategies/base.py index 7758241..4b345d4 100644 --- a/bonobo/strategies/base.py +++ b/bonobo/strategies/base.py @@ -1,4 +1,4 @@ -from bonobo.execution import GraphExecutionContext +from bonobo.execution.graph import GraphExecutionContext class Strategy: diff --git a/bonobo/util/iterators.py b/bonobo/util/iterators.py index 0a79f96..cdf02a7 100644 --- a/bonobo/util/iterators.py +++ b/bonobo/util/iterators.py @@ -20,4 +20,10 @@ def force_iterator(mixed): def ensure_tuple(tuple_or_mixed): if isinstance(tuple_or_mixed, tuple): return tuple_or_mixed - return (tuple_or_mixed,) \ No newline at end of file + return (tuple_or_mixed,) + + +def iter_if_not_sequence(mixed): + if isinstance(mixed, (dict, list, str)): + raise TypeError(type(mixed).__name__) + return iter(mixed) \ No newline at end of file diff --git a/bonobo/util/testing.py b/bonobo/util/testing.py index 6f3fa57..ca400cc 100644 --- a/bonobo/util/testing.py +++ b/bonobo/util/testing.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext class CapturingNodeExecutionContext(NodeExecutionContext): diff --git a/setup.py b/setup.py index 211bc09..6bd307a 100644 --- a/setup.py +++ b/setup.py @@ -40,36 +40,40 @@ setup( name='bonobo', description='Bonobo', license='Apache License, Version 2.0', - install_requires=['colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.12,<2.13', 'stevedore >=1.19,<1.20'], + install_requires=[ + 'colorama >=0.3,<0.4', 'psutil >=5.2,<5.3', 'requests >=2.13,<2.14', + 'stevedore >=1.19,<1.20' + ], version=version, long_description=read('README.rst'), classifiers=read('classifiers.txt', tolines), packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, - data_files=[ - ( - 'share/jupyter/nbextensions/bonobo-jupyter', [ - 'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js', - 'bonobo/ext/jupyter/static/index.js.map' - ] - ) - ], + data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [ + 'bonobo/ext/jupyter/static/extension.js', + 'bonobo/ext/jupyter/static/index.js', + 'bonobo/ext/jupyter/static/index.js.map' + ])], extras_require={ 'dev': [ - 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4', 'pylint >=1.6,<1.7', 'pytest >=3,<4', - 'pytest-cov >=2.4,<2.5', 'pytest-timeout >=1.2,<1.3', 'sphinx', 'sphinx_rtd_theme', 'yapf' + 'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'pylint >=1,<2', + 'pytest >=3,<4', 'pytest-cov >=2,<3', 'pytest-timeout >=1,<2', + 'sphinx', 'sphinx_rtd_theme', 'yapf' ], 'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5'] }, entry_points={ 'bonobo.commands': [ - 'init = bonobo.commands.init:register', 'run = bonobo.commands.run:register', + 'init = bonobo.commands.init:register', + 'run = bonobo.commands.run:register', 'version = bonobo.commands.version:register' ], 'console_scripts': ['bonobo = bonobo.commands:entrypoint'], - 'edgy.project.features': ['bonobo = ' - 'bonobo.ext.edgy.project.feature:BonoboFeature'] + 'edgy.project.features': + ['bonobo = ' + 'bonobo.ext.edgy.project.feature:BonoboFeature'] }, url='https://www.bonobo-project.org/', - download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), -) + download_url= + 'https://github.com/python-bonobo/bonobo/tarball/{version}'.format( + version=version), ) diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 87904a9..0e09a34 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, CsvReader, CsvWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/io/test_file.py b/tests/io/test_file.py index 234a323..71f6785 100644 --- a/tests/io/test_file.py +++ b/tests/io/test_file.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, FileReader, FileWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/io/test_json.py b/tests/io/test_json.py index 8f6d84b..1c8c124 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -2,7 +2,7 @@ import pytest from bonobo import Bag, JsonReader, JsonWriter from bonobo.constants import BEGIN, END -from bonobo.execution import NodeExecutionContext +from bonobo.execution.node import NodeExecutionContext from bonobo.util.testing import CapturingNodeExecutionContext diff --git a/tests/test_execution.py b/tests/test_execution.py index dafd7e1..e2fe1c2 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -1,7 +1,7 @@ from bonobo import Graph, NaiveStrategy, Bag from bonobo.config.processors import contextual from bonobo.constants import BEGIN, END -from bonobo.execution import GraphExecutionContext +from bonobo.execution.graph import GraphExecutionContext def generate_integers():