wip, aio: asyncio strategy (defunct, not fully implemented) and related refactorings.

This commit is contained in:
Romain Dorgueil
2018-07-29 15:24:35 +01:00
parent 980a76399b
commit 8ea7ce0b1a
16 changed files with 206 additions and 95 deletions

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.6.3 on 2018-07-28. # Generated by Medikit 0.6.3 on 2018-07-29.
# All changes will be overriden. # All changes will be overriden.
# Edit Projectfile and run “make update” (or “medikit update”) to regenerate. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate.

View File

@ -43,6 +43,7 @@ python.setup(
) )
python.add_requirements( python.add_requirements(
'cached-property ~=1.4',
'fs ~=2.0', 'fs ~=2.0',
'graphviz >=0.8,<0.9', 'graphviz >=0.8,<0.9',
'jinja2 ~=2.9', 'jinja2 ~=2.9',

View File

@ -1,4 +1,5 @@
import bonobo import bonobo
from bonobo.execution.strategies import STRATEGIES, DEFAULT_STRATEGY
def get_argument_parser(parser=None): def get_argument_parser(parser=None):
@ -19,6 +20,14 @@ def get_argument_parser(parser=None):
help='If set, pretty prints before writing to output file.' help='If set, pretty prints before writing to output file.'
) )
parser.add_argument(
'--strategy',
'-s',
type=str,
choices=STRATEGIES.keys(),
default=DEFAULT_STRATEGY,
)
return parser return parser

View File

@ -1,3 +1,6 @@
"""
"""
import bonobo import bonobo
from bonobo import examples from bonobo import examples
from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader from bonobo.contrib.opendatasoft import OpenDataSoftAPI as ODSReader

View File

@ -15,11 +15,13 @@ and a flat txt file.
""" """
import json import json
import sys
import bonobo import bonobo
from bonobo import examples from bonobo import examples
from bonobo.contrib.opendatasoft import OpenDataSoftAPI from bonobo.contrib.opendatasoft import OpenDataSoftAPI
from bonobo.examples.datasets.services import get_services from bonobo.examples.datasets.services import get_services
from bonobo.util.statistics import Timer
try: try:
import pycountry import pycountry
@ -66,7 +68,20 @@ if __name__ == '__main__':
parser = examples.get_argument_parser() parser = examples.get_argument_parser()
with bonobo.parse_args(parser) as options: with bonobo.parse_args(parser) as options:
bonobo.run( with Timer() as timer:
get_graph(**examples.get_graph_options(options)), print(
services=get_services() 'Options:', ' '.join(
) '{}={}'.format(k, v)
for k, v in sorted(options.items())
)
)
retval = bonobo.run(
get_graph(**examples.get_graph_options(options)),
services=get_services(),
strategy=options['strategy'],
)
print('Execution time:', timer)
print('Return value:', retval)
print('XStatus:', retval.xstatus)
if retval.xstatus:
sys.exit(retval.xstatus)

View File

@ -8,18 +8,20 @@ from bonobo.constants import BEGIN, END, EMPTY
from bonobo.errors import InactiveReadableError from bonobo.errors import InactiveReadableError
from bonobo.execution import events from bonobo.execution import events
from bonobo.execution.contexts.base import BaseContext from bonobo.execution.contexts.base import BaseContext
from bonobo.execution.contexts.node import NodeExecutionContext from bonobo.execution.contexts.node import NodeExecutionContext, AsyncNodeExecutionContext
from bonobo.execution.contexts.plugin import PluginExecutionContext from bonobo.execution.contexts.plugin import PluginExecutionContext
from whistle import EventDispatcher from whistle import EventDispatcher
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class GraphExecutionContext(BaseContext): class BaseGraphExecutionContext(BaseContext):
""" """
Stores the actual state of a graph execution, and manages its lifecycle. Stores the actual state of a graph execution, and manages its lifecycle. This is an abstract base class for all
graph execution contexts, and a few methods should actually be implemented for the child classes to be useable.
""" """
NodeExecutionContextType = NodeExecutionContext NodeExecutionContextType = NodeExecutionContext
PluginExecutionContextType = PluginExecutionContext PluginExecutionContextType = PluginExecutionContext
@ -28,23 +30,31 @@ class GraphExecutionContext(BaseContext):
@property @property
def started(self): def started(self):
if not len(self.nodes): if not len(self.nodes):
return super(GraphExecutionContext, self).started return super(BaseGraphExecutionContext, self).started
return any(node.started for node in self.nodes) return any(node.started for node in self.nodes)
@property @property
def stopped(self): def stopped(self):
if not len(self.nodes): if not len(self.nodes):
return super(GraphExecutionContext, self).stopped return super(BaseGraphExecutionContext, self).stopped
return all(node.started and node.stopped for node in self.nodes) return all(node.started and node.stopped for node in self.nodes)
@property @property
def alive(self): def alive(self):
if not len(self.nodes): if not len(self.nodes):
return super(GraphExecutionContext, self).alive return super(BaseGraphExecutionContext, self).alive
return any(node.alive for node in self.nodes) return any(node.alive for node in self.nodes)
@property
def xstatus(self):
"""
UNIX-like exit status, only coherent if the context has stopped.
"""
return max(node.xstatus for node in self.nodes) if len(self.nodes) else 0
def __init__(self, graph, *, plugins=None, services=None, dispatcher=None): def __init__(self, graph, *, plugins=None, services=None, dispatcher=None):
super(GraphExecutionContext, self).__init__(graph) super(BaseGraphExecutionContext, self).__init__(graph)
self.dispatcher = dispatcher or EventDispatcher() self.dispatcher = dispatcher or EventDispatcher()
self.graph = graph self.graph = graph
self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] self.nodes = [self.create_node_execution_context_for(node) for node in self.graph]
@ -58,8 +68,8 @@ class GraphExecutionContext(BaseContext):
outputs = self.graph.outputs_of(i) outputs = self.graph.outputs_of(i)
if len(outputs): if len(outputs):
node_context.outputs = [self[j].input for j in outputs] node_context.outputs = [self[j].input for j in outputs]
node_context.input.on_begin = partial(node_context._send, BEGIN, _control=True) node_context.input.on_begin = partial(node_context._put, BEGIN, _control=True)
node_context.input.on_end = partial(node_context._send, END, _control=True) node_context.input.on_end = partial(node_context._put, END, _control=True)
node_context.input.on_finalize = partial(node_context.stop) node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item): def __getitem__(self, item):
@ -79,28 +89,32 @@ class GraphExecutionContext(BaseContext):
plugin = plugin() plugin = plugin()
return self.PluginExecutionContextType(plugin, parent=self) return self.PluginExecutionContextType(plugin, parent=self)
def write(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].write(message)
def dispatch(self, name): def dispatch(self, name):
self.dispatcher.dispatch(name, events.ExecutionEvent(self)) self.dispatcher.dispatch(name, events.ExecutionEvent(self))
def register_plugins(self):
for plugin_context in self.plugins:
plugin_context.register()
def unregister_plugins(self):
for plugin_context in self.plugins:
plugin_context.unregister()
class GraphExecutionContext(BaseGraphExecutionContext):
def start(self, starter=None): def start(self, starter=None):
super(GraphExecutionContext, self).start() super(GraphExecutionContext, self).start()
self.register_plugins() self.register_plugins()
self.dispatch(events.START) self.dispatch(events.START)
self.tick(pause=False) self.tick(pause=False)
for node in self.nodes: for node in self.nodes:
if starter is None: if starter is None:
node.start() node.start()
else: else:
starter(node) starter(node)
self.dispatch(events.STARTED) self.dispatch(events.STARTED)
def tick(self, pause=True): def tick(self, pause=True):
@ -108,22 +122,6 @@ class GraphExecutionContext(BaseContext):
if pause: if pause:
sleep(self.TICK_PERIOD) sleep(self.TICK_PERIOD)
def loop(self):
nodes = set(node for node in self.nodes if node.should_loop)
while self.should_loop and len(nodes):
self.tick(pause=False)
for node in list(nodes):
try:
node.step()
except Empty:
continue
except InactiveReadableError:
nodes.discard(node)
def run_until_complete(self):
self.write(BEGIN, EMPTY, END)
self.loop()
def stop(self, stopper=None): def stop(self, stopper=None):
super(GraphExecutionContext, self).stop() super(GraphExecutionContext, self).stop()
@ -145,18 +143,37 @@ class GraphExecutionContext(BaseContext):
node_context.kill() node_context.kill()
self.tick() self.tick()
def register_plugins(self): def write(self, *messages):
for plugin_context in self.plugins: """Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
plugin_context.register() our graph."""
def unregister_plugins(self): for i in self.graph.outputs_of(BEGIN):
for plugin_context in self.plugins: for message in messages:
plugin_context.unregister() self[i].write(message)
@property def loop(self):
def xstatus(self): nodes = set(node for node in self.nodes if node.should_loop)
""" while self.should_loop and len(nodes):
UNIX-like exit status, only coherent if the context has stopped. self.tick(pause=False)
for node in list(nodes):
try:
node.step()
except Empty:
continue
except InactiveReadableError:
nodes.discard(node)
""" def run_until_complete(self):
return max(node.xstatus for node in self.nodes) if len(self.nodes) else 0 self.write(BEGIN, EMPTY, END)
self.loop()
class AsyncGraphExecutionContext(GraphExecutionContext):
NodeExecutionContextType = AsyncNodeExecutionContext
def __init__(self, *args, loop, **kwargs):
self._event_loop = loop
super().__init__(*args, **kwargs)
def create_node_execution_context_for(self, node):
return self.NodeExecutionContextType(node, parent=self, loop=self._event_loop)

View File

@ -10,7 +10,7 @@ from bonobo.config.processors import ContextCurrifier
from bonobo.constants import BEGIN, END, TICK_PERIOD from bonobo.constants import BEGIN, END, TICK_PERIOD
from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError
from bonobo.execution.contexts.base import BaseContext from bonobo.execution.contexts.base import BaseContext
from bonobo.structs.inputs import Input from bonobo.structs.inputs import Input, AioInput
from bonobo.structs.tokens import Token, Flag from bonobo.structs.tokens import Token, Flag
from bonobo.util import get_name, isconfigurabletype, ensure_tuple, deprecated from bonobo.util import get_name, isconfigurabletype, ensure_tuple, deprecated
from bonobo.util.bags import BagType from bonobo.util.bags import BagType
@ -33,6 +33,8 @@ class NodeExecutionContext(BaseContext, WithStatistics):
""" """
QueueType = Input
def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None): def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None):
""" """
Node execution context has the responsibility fo storing the state of a transformation during its execution. Node execution context has the responsibility fo storing the state of a transformation during its execution.
@ -57,7 +59,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
self.services = None self.services = None
# Input / Output: how the wrapped node will communicate # Input / Output: how the wrapped node will communicate
self.input = _input or Input() self.input = _input or self.QueueType()
self.outputs = _outputs or [] self.outputs = _outputs or []
# Types # Types
@ -174,10 +176,10 @@ class NodeExecutionContext(BaseContext, WithStatistics):
break break
else: else:
# Push data (in case of an iterator) # Push data (in case of an iterator)
self._send(self._cast(input_bag, result)) self._put(self._cast(input_bag, result))
elif results: elif results:
# Push data (returned value) # Push data (returned value)
self._send(self._cast(input_bag, results)) self._put(self._cast(input_bag, results))
else: else:
# case with no result, an execution went through anyway, use for stats. # case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1 # self._exec_count += 1
@ -197,7 +199,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
super().stop() super().stop()
def send(self, *_output, _input=None): def send(self, *_output, _input=None):
return self._send(self._cast(_input, _output)) return self._put(self._cast(_input, _output))
### Input type and fields ### Input type and fields
@property @property
@ -324,7 +326,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
def _cast(self, _input, _output): def _cast(self, _input, _output):
""" """
Transforms a pair of input/output into the real slim output. Transforms a pair of input/output into the real slim shoutput.
:param _input: Bag :param _input: Bag
:param _output: mixed :param _output: mixed
@ -355,7 +357,7 @@ class NodeExecutionContext(BaseContext, WithStatistics):
return ensure_tuple(_output, cls=self._output_type) return ensure_tuple(_output, cls=self._output_type)
def _send(self, value, _control=False): def _put(self, value, _control=False):
""" """
Sends a message to all of this context's outputs. Sends a message to all of this context's outputs.
@ -377,6 +379,52 @@ class NodeExecutionContext(BaseContext, WithStatistics):
return UnboundArguments((), {}) return UnboundArguments((), {})
class AsyncNodeExecutionContext(NodeExecutionContext):
QueueType = AioInput
def __init__(self, *args, loop, **kwargs):
super().__init__(*args, **kwargs)
self._event_loop = loop
async def _get(self):
"""
Read from the input queue.
If Queue raises (like Timeout or Empty), stat won't be changed.
"""
input_bag = await self.input.get()
# Store or check input type
if self._input_type is None:
self._input_type = type(input_bag)
elif type(input_bag) != self._input_type:
try:
if self._input_type == tuple:
input_bag = self._input_type(input_bag)
else:
input_bag = self._input_type(*input_bag)
except Exception as exc:
raise UnrecoverableTypeError(
'Input type changed to incompatible type between calls to {!r}.\nGot {!r} which is not of type {!r}.'.
format(self.wrapped, input_bag, self._input_type)
) from exc
# Store or check input length, which is a soft fallback in case we're just using tuples
if self._input_length is None:
self._input_length = len(input_bag)
elif len(input_bag) != self._input_length:
raise UnrecoverableTypeError(
'Input length changed between calls to {!r}.\nExpected {} but got {}: {!r}.'.format(
self.wrapped, self._input_length, len(input_bag), input_bag
)
)
self.increment('in') # XXX should that go before type check ?
return input_bag
def isflag(param): def isflag(param):
return isinstance(param, Flag) return isinstance(param, Flag)

View File

@ -6,7 +6,8 @@ In the future, the two strategies that would really benefit bonobo are subproces
at home if you want to give it a shot. at home if you want to give it a shot.
""" """
from bonobo.execution.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy from bonobo.execution.strategies.executor import ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy, \
AsyncThreadPoolExecutorStrategy
from bonobo.execution.strategies.naive import NaiveStrategy from bonobo.execution.strategies.naive import NaiveStrategy
__all__ = [ __all__ = [
@ -17,6 +18,7 @@ STRATEGIES = {
'naive': NaiveStrategy, 'naive': NaiveStrategy,
'processpool': ProcessPoolExecutorStrategy, 'processpool': ProcessPoolExecutorStrategy,
'threadpool': ThreadPoolExecutorStrategy, 'threadpool': ThreadPoolExecutorStrategy,
'aio_threadpool': AsyncThreadPoolExecutorStrategy,
} }
DEFAULT_STRATEGY = 'threadpool' DEFAULT_STRATEGY = 'threadpool'

View File

@ -1,10 +1,16 @@
import asyncio
import functools import functools
import logging import logging
import sys import sys
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from cached_property import cached_property
from bonobo import settings
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution.contexts.graph import AsyncGraphExecutionContext
from bonobo.execution.strategies.base import Strategy from bonobo.execution.strategies.base import Strategy
from bonobo.util import get_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -73,6 +79,35 @@ class ThreadPoolExecutorStrategy(ExecutorStrategy):
return self.executor_factory(max_workers=len(graph)) return self.executor_factory(max_workers=len(graph))
class AsyncThreadPoolExecutorStrategy(ThreadPoolExecutorStrategy):
GraphExecutionContextType = AsyncGraphExecutionContext
def __init__(self, GraphExecutionContextType=None):
if not settings.ALPHA.get():
raise NotImplementedError(
'{} is experimental, you need to explicitely activate it using ALPHA=True in system env.'.format(
get_name(self)
)
)
super().__init__(GraphExecutionContextType)
@cached_property
def loop(self):
return asyncio.get_event_loop()
def create_graph_execution_context(self, *args, **kwargs):
return super(AsyncThreadPoolExecutorStrategy, self).create_graph_execution_context(
*args, **kwargs, loop=self.loop
)
def get_starter(self, executor, futures):
return functools.partial(
self.loop.run_in_executor,
executor,
super(AsyncThreadPoolExecutorStrategy, self).get_starter(executor, futures),
)
class ProcessPoolExecutorStrategy(ExecutorStrategy): class ProcessPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ProcessPoolExecutor executor_factory = ProcessPoolExecutor

View File

@ -1,4 +1,3 @@
from bonobo.constants import BEGIN, END
from bonobo.execution.strategies.base import Strategy from bonobo.execution.strategies.base import Strategy
@ -6,20 +5,6 @@ class NaiveStrategy(Strategy):
# TODO: how to run plugins in "naive" mode ? # TODO: how to run plugins in "naive" mode ?
def execute(self, graph, **kwargs): def execute(self, graph, **kwargs):
context = self.create_graph_execution_context(graph, **kwargs) with self.create_graph_execution_context(graph, **kwargs) as context:
context.write(BEGIN, (), END) context.run_until_complete()
# start
context.start()
# loop
nodes = list(context.nodes)
while len(nodes):
for node in nodes:
node.loop()
nodes = list(node for node in nodes if node.alive)
# stop
context.stop()
return context return context

View File

@ -84,6 +84,9 @@ DEBUG = Setting('DEBUG', formatter=to_bool, default=False)
# Profile mode. # Profile mode.
PROFILE = Setting('PROFILE', formatter=to_bool, default=False) PROFILE = Setting('PROFILE', formatter=to_bool, default=False)
# Alpha mode.
ALPHA = Setting('ALPHA', formatter=to_bool, default=False)
# Quiet mode. # Quiet mode.
QUIET = Setting('QUIET', formatter=to_bool, default=False) QUIET = Setting('QUIET', formatter=to_bool, default=False)

View File

@ -16,6 +16,7 @@
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from queue import Queue from queue import Queue
from asyncio.queues import Queue as AioQueue
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.errors import AbstractError, InactiveReadableError, InactiveWritableError from bonobo.errors import AbstractError, InactiveReadableError, InactiveWritableError
@ -115,3 +116,7 @@ class Input(Queue, Readable, Writable):
@property @property
def alive(self): def alive(self):
return self._runlevel > 0 return self._runlevel > 0
class AioInput(AioQueue):
pass

View File

@ -1,18 +1,3 @@
# -*- coding: utf-8 -*-
#
# copyright 2012-2014 romain dorgueil
#
# licensed under the apache license, version 2.0 (the "license");
# you may not use this file except in compliance with the license.
# you may obtain a copy of the license at
#
# http://www.apache.org/licenses/license-2.0
#
# unless required by applicable law or agreed to in writing, software
# distributed under the license is distributed on an "as is" basis,
# without warranties or conditions of any kind, either express or implied.
# see the license for the specific language governing permissions and
# limitations under the license.
import time import time
@ -39,6 +24,7 @@ class Timer:
def __enter__(self): def __enter__(self):
self.__start = time.time() self.__start = time.time()
return self
def __exit__(self, type=None, value=None, traceback=None): def __exit__(self, type=None, value=None, traceback=None):
# Error handling here # Error handling here

View File

@ -20,14 +20,14 @@ jinja2==2.10
markupsafe==1.0 markupsafe==1.0
more-itertools==4.2.0 more-itertools==4.2.0
packaging==17.1 packaging==17.1
pluggy==0.6.0 pluggy==0.7.1
poyo==0.4.1 poyo==0.4.1
py==1.5.4 py==1.5.4
pygments==2.2.0 pygments==2.2.0
pyparsing==2.2.0 pyparsing==2.2.0
pytest-cov==2.5.1 pytest-cov==2.5.1
pytest-timeout==1.3.1 pytest-timeout==1.3.1
pytest==3.6.3 pytest==3.6.4
python-dateutil==2.7.3 python-dateutil==2.7.3
pytz==2018.5 pytz==2018.5
requests==2.19.1 requests==2.19.1

View File

@ -1,5 +1,6 @@
-e . -e .
appdirs==1.4.3 appdirs==1.4.3
cached-property==1.4.3
certifi==2018.4.16 certifi==2018.4.16
chardet==3.0.4 chardet==3.0.4
colorama==0.3.9 colorama==0.3.9

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.6.3 on 2018-07-28. # Generated by Medikit 0.6.3 on 2018-07-29.
# All changes will be overriden. # All changes will be overriden.
# Edit Projectfile and run “make update” (or “medikit update”) to regenerate. # Edit Projectfile and run “make update” (or “medikit update”) to regenerate.
@ -61,8 +61,9 @@ setup(
packages=find_packages(exclude=['ez_setup', 'example', 'test']), packages=find_packages(exclude=['ez_setup', 'example', 'test']),
include_package_data=True, include_package_data=True,
install_requires=[ install_requires=[
'fs (~= 2.0)', 'graphviz (>= 0.8, < 0.9)', 'jinja2 (~= 2.9)', 'mondrian (~= 0.7)', 'packaging (~= 17.0)', 'cached-property (~= 1.4)', 'fs (~= 2.0)', 'graphviz (>= 0.8, < 0.9)', 'jinja2 (~= 2.9)', 'mondrian (~= 0.7)',
'psutil (~= 5.4)', 'python-slugify (~= 1.2.0)', 'requests (~= 2.0)', 'stevedore (~= 1.27)', 'whistle (~= 1.0)' 'packaging (~= 17.0)', 'psutil (~= 5.4)', 'python-slugify (~= 1.2.0)', 'requests (~= 2.0)',
'stevedore (~= 1.27)', 'whistle (~= 1.0)'
], ],
extras_require={ extras_require={
'dev': [ 'dev': [