From 8ea7ce0b1a3546b08bc559311b4c024defa99692 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sun, 29 Jul 2018 15:24:35 +0100 Subject: [PATCH] wip, aio: asyncio strategy (defunct, not fully implemented) and related refactorings. --- Makefile | 2 +- Projectfile | 1 + bonobo/examples/__init__.py | 9 ++ bonobo/examples/datasets/coffeeshops.py | 3 + bonobo/examples/datasets/fablabs.py | 23 ++++- bonobo/execution/contexts/graph.py | 107 ++++++++++++++---------- bonobo/execution/contexts/node.py | 62 ++++++++++++-- bonobo/execution/strategies/__init__.py | 4 +- bonobo/execution/strategies/executor.py | 35 ++++++++ bonobo/execution/strategies/naive.py | 19 +---- bonobo/settings.py | 3 + bonobo/structs/inputs.py | 5 ++ bonobo/util/statistics.py | 16 +--- requirements-dev.txt | 4 +- requirements.txt | 1 + setup.py | 7 +- 16 files changed, 206 insertions(+), 95 deletions(-) diff --git a/Makefile b/Makefile index 127d304..0b29ce5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Generated by Medikit 0.6.3 on 2018-07-28. +# Generated by Medikit 0.6.3 on 2018-07-29. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. diff --git a/Projectfile b/Projectfile index ff876a0..6174a89 100644 --- a/Projectfile +++ b/Projectfile @@ -43,6 +43,7 @@ python.setup( ) python.add_requirements( + 'cached-property ~=1.4', 'fs ~=2.0', 'graphviz >=0.8,<0.9', 'jinja2 ~=2.9', diff --git a/bonobo/examples/__init__.py b/bonobo/examples/__init__.py index ec68fc5..dba9989 100644 --- a/bonobo/examples/__init__.py +++ b/bonobo/examples/__init__.py @@ -1,4 +1,5 @@ import bonobo +from bonobo.execution.strategies import STRATEGIES, DEFAULT_STRATEGY def get_argument_parser(parser=None): @@ -19,6 +20,14 @@ def get_argument_parser(parser=None): help='If set, pretty prints before writing to output file.' ) + parser.add_argument( + '--strategy', + '-s', + type=str, + choices=STRATEGIES.keys(), + default=DEFAULT_STRATEGY, + ) + return parser diff --git a/bonobo/examples/datasets/coffeeshops.py b/bonobo/examples/datasets/coffeeshops.py index 93aa0d5..714b2a1 100644 --- a/bonobo/examples/datasets/coffeeshops.py +++ b/bonobo/examples/datasets/coffeeshops.py @@ -1,3 +1,6 @@ +""" + +""" import bonobo from bonobo import examples from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader diff --git a/bonobo/examples/datasets/fablabs.py b/bonobo/examples/datasets/fablabs.py index 9a02593..807440b 100644 --- a/bonobo/examples/datasets/fablabs.py +++ b/bonobo/examples/datasets/fablabs.py @@ -15,11 +15,13 @@ and a flat txt file. """ import json +import sys import bonobo from bonobo import examples from bonobo.contrib.opendatasoft import OpenDataSoftAPI from bonobo.examples.datasets.services import get_services +from bonobo.util.statistics import Timer try: import pycountry @@ -66,7 +68,20 @@ if __name__ == '__main__': parser = examples.get_argument_parser() with bonobo.parse_args(parser) as options: - bonobo.run( - get_graph(**examples.get_graph_options(options)), - services=get_services() - ) + with Timer() as timer: + print( + 'Options:', ' '.join( + '{}={}'.format(k, v) + for k, v in sorted(options.items()) + ) + ) + retval = bonobo.run( + get_graph(**examples.get_graph_options(options)), + services=get_services(), + strategy=options['strategy'], + ) + print('Execution time:', timer) + print('Return value:', retval) + print('XStatus:', retval.xstatus) + if retval.xstatus: + sys.exit(retval.xstatus) diff --git a/bonobo/execution/contexts/graph.py b/bonobo/execution/contexts/graph.py index 59c9063..2c5bbd4 100644 --- a/bonobo/execution/contexts/graph.py +++ b/bonobo/execution/contexts/graph.py @@ -8,18 +8,20 @@ from bonobo.constants import BEGIN, END, EMPTY from bonobo.errors import InactiveReadableError from bonobo.execution import events from bonobo.execution.contexts.base import BaseContext -from bonobo.execution.contexts.node import NodeExecutionContext +from bonobo.execution.contexts.node import NodeExecutionContext, AsyncNodeExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext from whistle import EventDispatcher logger = logging.getLogger(__name__) -class GraphExecutionContext(BaseContext): +class BaseGraphExecutionContext(BaseContext): """ - Stores the actual state of a graph execution, and manages its lifecycle. + Stores the actual state of a graph execution, and manages its lifecycle. This is an abstract base class for all + graph execution contexts, and a few methods should actually be implemented for the child classes to be useable. """ + NodeExecutionContextType = NodeExecutionContext PluginExecutionContextType = PluginExecutionContext @@ -28,23 +30,31 @@ class GraphExecutionContext(BaseContext): @property def started(self): if not len(self.nodes): - return super(GraphExecutionContext, self).started + return super(BaseGraphExecutionContext, self).started return any(node.started for node in self.nodes) @property def stopped(self): if not len(self.nodes): - return super(GraphExecutionContext, self).stopped + return super(BaseGraphExecutionContext, self).stopped return all(node.started and node.stopped for node in self.nodes) @property def alive(self): if not len(self.nodes): - return super(GraphExecutionContext, self).alive + return super(BaseGraphExecutionContext, self).alive return any(node.alive for node in self.nodes) + @property + def xstatus(self): + """ + UNIX-like exit status, only coherent if the context has stopped. + + """ + return max(node.xstatus for node in self.nodes) if len(self.nodes) else 0 + def __init__(self, graph, *, plugins=None, services=None, dispatcher=None): - super(GraphExecutionContext, self).__init__(graph) + super(BaseGraphExecutionContext, self).__init__(graph) self.dispatcher = dispatcher or EventDispatcher() self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] @@ -58,8 +68,8 @@ class GraphExecutionContext(BaseContext): outputs = self.graph.outputs_of(i) if len(outputs): node_context.outputs = [self[j].input for j in outputs] - node_context.input.on_begin = partial(node_context._send, BEGIN, _control=True) - node_context.input.on_end = partial(node_context._send, END, _control=True) + node_context.input.on_begin = partial(node_context._put, BEGIN, _control=True) + node_context.input.on_end = partial(node_context._put, END, _control=True) node_context.input.on_finalize = partial(node_context.stop) def __getitem__(self, item): @@ -79,28 +89,32 @@ class GraphExecutionContext(BaseContext): plugin = plugin() return self.PluginExecutionContextType(plugin, parent=self) - def write(self, *messages): - """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in - our graph.""" - - for i in self.graph.outputs_of(BEGIN): - for message in messages: - self[i].write(message) - def dispatch(self, name): self.dispatcher.dispatch(name, events.ExecutionEvent(self)) + def register_plugins(self): + for plugin_context in self.plugins: + plugin_context.register() + + def unregister_plugins(self): + for plugin_context in self.plugins: + plugin_context.unregister() + + +class GraphExecutionContext(BaseGraphExecutionContext): def start(self, starter=None): super(GraphExecutionContext, self).start() self.register_plugins() self.dispatch(events.START) self.tick(pause=False) + for node in self.nodes: if starter is None: node.start() else: starter(node) + self.dispatch(events.STARTED) def tick(self, pause=True): @@ -108,22 +122,6 @@ class GraphExecutionContext(BaseContext): if pause: sleep(self.TICK_PERIOD) - def loop(self): - nodes = set(node for node in self.nodes if node.should_loop) - while self.should_loop and len(nodes): - self.tick(pause=False) - for node in list(nodes): - try: - node.step() - except Empty: - continue - except InactiveReadableError: - nodes.discard(node) - - def run_until_complete(self): - self.write(BEGIN, EMPTY, END) - self.loop() - def stop(self, stopper=None): super(GraphExecutionContext, self).stop() @@ -145,18 +143,37 @@ class GraphExecutionContext(BaseContext): node_context.kill() self.tick() - def register_plugins(self): - for plugin_context in self.plugins: - plugin_context.register() + def write(self, *messages): + """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in + our graph.""" - def unregister_plugins(self): - for plugin_context in self.plugins: - plugin_context.unregister() + for i in self.graph.outputs_of(BEGIN): + for message in messages: + self[i].write(message) - @property - def xstatus(self): - """ - UNIX-like exit status, only coherent if the context has stopped. + def loop(self): + nodes = set(node for node in self.nodes if node.should_loop) + while self.should_loop and len(nodes): + self.tick(pause=False) + for node in list(nodes): + try: + node.step() + except Empty: + continue + except InactiveReadableError: + nodes.discard(node) - """ - return max(node.xstatus for node in self.nodes) if len(self.nodes) else 0 + def run_until_complete(self): + self.write(BEGIN, EMPTY, END) + self.loop() + + +class AsyncGraphExecutionContext(GraphExecutionContext): + NodeExecutionContextType = AsyncNodeExecutionContext + + def __init__(self, *args, loop, **kwargs): + self._event_loop = loop + super().__init__(*args, **kwargs) + + def create_node_execution_context_for(self, node): + return self.NodeExecutionContextType(node, parent=self, loop=self._event_loop) diff --git a/bonobo/execution/contexts/node.py b/bonobo/execution/contexts/node.py index dd4d138..a34a784 100644 --- a/bonobo/execution/contexts/node.py +++ b/bonobo/execution/contexts/node.py @@ -10,7 +10,7 @@ from bonobo.config.processors import ContextCurrifier from bonobo.constants import BEGIN, END, TICK_PERIOD from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError from bonobo.execution.contexts.base import BaseContext -from bonobo.structs.inputs import Input +from bonobo.structs.inputs import Input, AioInput from bonobo.structs.tokens import Token, Flag from bonobo.util import get_name, isconfigurabletype, ensure_tuple, deprecated from bonobo.util.bags import BagType @@ -33,6 +33,8 @@ class NodeExecutionContext(BaseContext, WithStatistics): """ + QueueType = Input + def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): """ Node execution context has the responsibility fo storing the state of a transformation during its execution. @@ -57,7 +59,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): self.services = None # Input / Output: how the wrapped node will communicate - self.input = _input or Input() + self.input = _input or self.QueueType() self.outputs = _outputs or [] # Types @@ -174,10 +176,10 @@ class NodeExecutionContext(BaseContext, WithStatistics): break else: # Push data (in case of an iterator) - self._send(self._cast(input_bag, result)) + self._put(self._cast(input_bag, result)) elif results: # Push data (returned value) - self._send(self._cast(input_bag, results)) + self._put(self._cast(input_bag, results)) else: # case with no result, an execution went through anyway, use for stats. # self._exec_count += 1 @@ -197,7 +199,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): super().stop() def send(self, *_output, _input=None): - return self._send(self._cast(_input, _output)) + return self._put(self._cast(_input, _output)) ### Input type and fields @property @@ -324,7 +326,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): def _cast(self, _input, _output): """ - Transforms a pair of input/output into the real slim output. + Transforms a pair of input/output into the real slim shoutput. :param _input: Bag :param _output: mixed @@ -355,7 +357,7 @@ class NodeExecutionContext(BaseContext, WithStatistics): return ensure_tuple(_output, cls=self._output_type) - def _send(self, value, _control=False): + def _put(self, value, _control=False): """ Sends a message to all of this context's outputs. @@ -377,6 +379,52 @@ class NodeExecutionContext(BaseContext, WithStatistics): return UnboundArguments((), {}) +class AsyncNodeExecutionContext(NodeExecutionContext): + QueueType = AioInput + + def __init__(self, *args, loop, **kwargs): + super().__init__(*args, **kwargs) + self._event_loop = loop + + async def _get(self): + """ + Read from the input queue. + + If Queue raises (like Timeout or Empty), stat won't be changed. + + """ + input_bag = await self.input.get() + + # Store or check input type + if self._input_type is None: + self._input_type = type(input_bag) + elif type(input_bag) != self._input_type: + try: + if self._input_type == tuple: + input_bag = self._input_type(input_bag) + else: + input_bag = self._input_type(*input_bag) + except Exception as exc: + raise UnrecoverableTypeError( + 'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'. + format(self.wrapped, input_bag, self._input_type) + ) from exc + + # Store or check input length, which is a soft fallback in case we're just using tuples + if self._input_length is None: + self._input_length = len(input_bag) + elif len(input_bag) != self._input_length: + raise UnrecoverableTypeError( + 'Input length changed between calls to {!r}.\nExpected {} but got {}: {!r}.'.format( + self.wrapped, self._input_length, len(input_bag), input_bag + ) + ) + + self.increment('in') # XXX should that go before type check ? + + return input_bag + + def isflag(param): return isinstance(param, Flag) diff --git a/bonobo/execution/strategies/__init__.py b/bonobo/execution/strategies/__init__.py index 7eacbcf..638826f 100644 --- a/bonobo/execution/strategies/__init__.py +++ b/bonobo/execution/strategies/__init__.py @@ -6,7 +6,8 @@ In the future, the two strategies that would really benefit bonobo are subproces at home if you want to give it a shot. """ -from bonobo.execution.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy +from bonobo.execution.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy, \ + AsyncThreadPoolExecutorStrategy from bonobo.execution.strategies.naive import NaiveStrategy __all__ = [ @@ -17,6 +18,7 @@ STRATEGIES = { 'naive': NaiveStrategy, 'processpool': ProcessPoolExecutorStrategy, 'threadpool': ThreadPoolExecutorStrategy, + 'aio_threadpool': AsyncThreadPoolExecutorStrategy, } DEFAULT_STRATEGY = 'threadpool' diff --git a/bonobo/execution/strategies/executor.py b/bonobo/execution/strategies/executor.py index 85de43b..ae9b259 100644 --- a/bonobo/execution/strategies/executor.py +++ b/bonobo/execution/strategies/executor.py @@ -1,10 +1,16 @@ +import asyncio import functools import logging import sys from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from cached_property import cached_property + +from bonobo import settings from bonobo.constants import BEGIN, END +from bonobo.execution.contexts.graph import AsyncGraphExecutionContext from bonobo.execution.strategies.base import Strategy +from bonobo.util import get_name logger = logging.getLogger(__name__) @@ -73,6 +79,35 @@ class ThreadPoolExecutorStrategy(ExecutorStrategy): return self.executor_factory(max_workers=len(graph)) +class AsyncThreadPoolExecutorStrategy(ThreadPoolExecutorStrategy): + GraphExecutionContextType = AsyncGraphExecutionContext + + def __init__(self, GraphExecutionContextType=None): + if not settings.ALPHA.get(): + raise NotImplementedError( + '{} is experimental, you need to explicitely activate it using ALPHA=True in system env.'.format( + get_name(self) + ) + ) + super().__init__(GraphExecutionContextType) + + @cached_property + def loop(self): + return asyncio.get_event_loop() + + def create_graph_execution_context(self, *args, **kwargs): + return super(AsyncThreadPoolExecutorStrategy, self).create_graph_execution_context( + *args, **kwargs, loop=self.loop + ) + + def get_starter(self, executor, futures): + return functools.partial( + self.loop.run_in_executor, + executor, + super(AsyncThreadPoolExecutorStrategy, self).get_starter(executor, futures), + ) + + class ProcessPoolExecutorStrategy(ExecutorStrategy): executor_factory = ProcessPoolExecutor diff --git a/bonobo/execution/strategies/naive.py b/bonobo/execution/strategies/naive.py index 01b0416..d865231 100644 --- a/bonobo/execution/strategies/naive.py +++ b/bonobo/execution/strategies/naive.py @@ -1,4 +1,3 @@ -from bonobo.constants import BEGIN, END from bonobo.execution.strategies.base import Strategy @@ -6,20 +5,6 @@ class NaiveStrategy(Strategy): # TODO: how to run plugins in "naive" mode ? def execute(self, graph, **kwargs): - context = self.create_graph_execution_context(graph, **kwargs) - context.write(BEGIN, (), END) - - # start - context.start() - - # loop - nodes = list(context.nodes) - while len(nodes): - for node in nodes: - node.loop() - nodes = list(node for node in nodes if node.alive) - - # stop - context.stop() - + with self.create_graph_execution_context(graph, **kwargs) as context: + context.run_until_complete() return context diff --git a/bonobo/settings.py b/bonobo/settings.py index d87a329..684d18c 100644 --- a/bonobo/settings.py +++ b/bonobo/settings.py @@ -84,6 +84,9 @@ DEBUG = Setting('DEBUG', formatter=to_bool, default=False) # Profile mode. PROFILE = Setting('PROFILE', formatter=to_bool, default=False) +# Alpha mode. +ALPHA = Setting('ALPHA', formatter=to_bool, default=False) + # Quiet mode. QUIET = Setting('QUIET', formatter=to_bool, default=False) diff --git a/bonobo/structs/inputs.py b/bonobo/structs/inputs.py index 9b3cd14..646051b 100644 --- a/bonobo/structs/inputs.py +++ b/bonobo/structs/inputs.py @@ -16,6 +16,7 @@ from abc import ABCMeta, abstractmethod from queue import Queue +from asyncio.queues import Queue as AioQueue from bonobo.constants import BEGIN, END from bonobo.errors import AbstractError, InactiveReadableError, InactiveWritableError @@ -115,3 +116,7 @@ class Input(Queue, Readable, Writable): @property def alive(self): return self._runlevel > 0 + + +class AioInput(AioQueue): + pass diff --git a/bonobo/util/statistics.py b/bonobo/util/statistics.py index 31da8b9..dce2714 100644 --- a/bonobo/util/statistics.py +++ b/bonobo/util/statistics.py @@ -1,18 +1,3 @@ -# -*- coding: utf-8 -*- -# -# copyright 2012-2014 romain dorgueil -# -# licensed under the apache license, version 2.0 (the "license"); -# you may not use this file except in compliance with the license. -# you may obtain a copy of the license at -# -# http://www.apache.org/licenses/license-2.0 -# -# unless required by applicable law or agreed to in writing, software -# distributed under the license is distributed on an "as is" basis, -# without warranties or conditions of any kind, either express or implied. -# see the license for the specific language governing permissions and -# limitations under the license. import time @@ -39,6 +24,7 @@ class Timer: def __enter__(self): self.__start = time.time() + return self def __exit__(self, type=None, value=None, traceback=None): # Error handling here diff --git a/requirements-dev.txt b/requirements-dev.txt index 3b230d9..601aefd 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -20,14 +20,14 @@ jinja2==2.10 markupsafe==1.0 more-itertools==4.2.0 packaging==17.1 -pluggy==0.6.0 +pluggy==0.7.1 poyo==0.4.1 py==1.5.4 pygments==2.2.0 pyparsing==2.2.0 pytest-cov==2.5.1 pytest-timeout==1.3.1 -pytest==3.6.3 +pytest==3.6.4 python-dateutil==2.7.3 pytz==2018.5 requests==2.19.1 diff --git a/requirements.txt b/requirements.txt index 23288a3..9018ecd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -e . appdirs==1.4.3 +cached-property==1.4.3 certifi==2018.4.16 chardet==3.0.4 colorama==0.3.9 diff --git a/setup.py b/setup.py index e7a2eb6..5e0be32 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Generated by Medikit 0.6.3 on 2018-07-28. +# Generated by Medikit 0.6.3 on 2018-07-29. # All changes will be overriden. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate. @@ -61,8 +61,9 @@ setup( packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, install_requires=[ - 'fs (~= 2.0)', 'graphviz (>= 0.8, < 0.9)', 'jinja2 (~= 2.9)', 'mondrian (~= 0.7)', 'packaging (~= 17.0)', - 'psutil (~= 5.4)', 'python-slugify (~= 1.2.0)', 'requests (~= 2.0)', 'stevedore (~= 1.27)', 'whistle (~= 1.0)' + 'cached-property (~= 1.4)', 'fs (~= 2.0)', 'graphviz (>= 0.8, < 0.9)', 'jinja2 (~= 2.9)', 'mondrian (~= 0.7)', + 'packaging (~= 17.0)', 'psutil (~= 5.4)', 'python-slugify (~= 1.2.0)', 'requests (~= 2.0)', + 'stevedore (~= 1.27)', 'whistle (~= 1.0)' ], extras_require={ 'dev': [