From 6bd1130e34021888d63cdc840e109254b1ce5f39 Mon Sep 17 00:00:00 2001 From: Romain Dorgueil Date: Sat, 4 Nov 2017 11:20:15 +0100 Subject: [PATCH] [core] Refactoring to use an event dispatcher in the main thread. Plugins now run in the main thread, instead of their own threads, and the API changed to use an event dispatcher approach instead of a static class interface. --- Makefile | 2 +- Projectfile | 3 +- bonobo/_api.py | 10 ++- bonobo/commands/__init__.py | 4 +- bonobo/examples/clock.py | 26 +++++++ bonobo/execution/base.py | 17 ++++- bonobo/execution/events.py | 13 ++++ bonobo/execution/graph.py | 52 ++++++++++--- bonobo/execution/node.py | 54 +++++++++----- bonobo/execution/plugin.py | 27 ++----- bonobo/ext/django.py | 2 +- bonobo/{plugins.py => plugins/__init__.py} | 13 +++- bonobo/{ext => plugins}/console.py | 86 +++++++++++++--------- bonobo/strategies/executor.py | 64 +++++----------- bonobo/strategies/util.py | 1 - requirements.txt | 3 +- setup.py | 4 +- 17 files changed, 233 insertions(+), 148 deletions(-) create mode 100644 bonobo/examples/clock.py create mode 100644 bonobo/execution/events.py rename bonobo/{plugins.py => plugins/__init__.py} (67%) rename bonobo/{ext => plugins}/console.py (83%) delete mode 100644 bonobo/strategies/util.py diff --git a/Makefile b/Makefile index 8baafb1..483467d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Generated by Medikit 0.4a10 on 2017-11-02. +# Generated by Medikit 0.4a10 on 2017-11-03. # All changes will be overriden. PACKAGE ?= bonobo diff --git a/Projectfile b/Projectfile index a91bfe5..7aa05b5 100644 --- a/Projectfile +++ b/Projectfile @@ -42,11 +42,12 @@ python.setup( python.add_requirements( 'fs >=2.0,<2.1', 'jinja2 >=2.9,<2.10', - 'mondrian ==0.4a0', + 'mondrian ==0.4a1', 'packaging >=16,<17', 'psutil >=5.4,<6.0', 'requests >=2.0,<3.0', 'stevedore >=1.27,<1.28', + 'whistle ==1.0a3', dev=[ 'pytest-sugar >=0.8,<0.9', 'pytest-timeout >=1,<2', diff --git a/bonobo/_api.py b/bonobo/_api.py index d4dbda2..9a82f41 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -49,10 +49,9 @@ def run(graph, *, plugins=None, services=None, strategy=None): if not settings.QUIET.get(): # pragma: no cover if _is_interactive_console(): import mondrian - mondrian.setup() - mondrian.setupExceptHook() + mondrian.setup(excepthook=True) - from bonobo.ext.console import ConsoleOutputPlugin + from bonobo.plugins.console import ConsoleOutputPlugin if ConsoleOutputPlugin not in plugins: plugins.append(ConsoleOutputPlugin) @@ -70,7 +69,10 @@ def run(graph, *, plugins=None, services=None, strategy=None): if JupyterOutputPlugin not in plugins: plugins.append(JupyterOutputPlugin) - return create_strategy(strategy).execute(graph, plugins=plugins, services=services) + import logging + logging.getLogger().setLevel(settings.LOGGING_LEVEL.get()) + strategy = create_strategy(strategy) + return strategy.execute(graph, plugins=plugins, services=services) def _inspect_as_graph(graph): diff --git a/bonobo/commands/__init__.py b/bonobo/commands/__init__.py index 39cfa05..f42d5c6 100644 --- a/bonobo/commands/__init__.py +++ b/bonobo/commands/__init__.py @@ -14,9 +14,7 @@ def entrypoint(args=None): """ - mondrian.setup() - mondrian.setupExceptHook() - + mondrian.setup(excepthook=True) logger = logging.getLogger() parser = argparse.ArgumentParser() diff --git a/bonobo/examples/clock.py b/bonobo/examples/clock.py new file mode 100644 index 0000000..765f077 --- /dev/null +++ b/bonobo/examples/clock.py @@ -0,0 +1,26 @@ +import bonobo +import datetime +import time + + +def extract(): + """Placeholder, change, rename, remove... """ + for x in range(60): + if x: + time.sleep(1) + yield datetime.datetime.now() + + +def get_graph(): + graph = bonobo.Graph() + graph.add_chain( + extract, + print, + ) + + return graph + +if __name__ == '__main__': + parser = bonobo.get_argument_parser() + with bonobo.parse_args(parser): + bonobo.run(get_graph()) diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index 08375a8..74dd89b 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -29,8 +29,7 @@ def unrecoverable(error_handler): class LoopingExecutionContext(Wrapper): - alive = True - PERIOD = 0.25 + PERIOD = 0.5 @property def started(self): @@ -40,6 +39,19 @@ class LoopingExecutionContext(Wrapper): def stopped(self): return self._stopped + @property + def alive(self): + return self._started and not self._stopped + + @property + def status(self): + """One character status for this node. """ + if not self.started: + return ' ' + if not self.stopped: + return '+' + return '-' + def __init__(self, wrapped, parent, services=None): super().__init__(wrapped) @@ -84,7 +96,6 @@ class LoopingExecutionContext(Wrapper): """Generic loop. A bit boring. """ while self.alive: self.step() - sleep(self.PERIOD) def step(self): """Left as an exercise for the children.""" diff --git a/bonobo/execution/events.py b/bonobo/execution/events.py new file mode 100644 index 0000000..036e879 --- /dev/null +++ b/bonobo/execution/events.py @@ -0,0 +1,13 @@ +from whistle import Event + +START = 'execution.start' +STARTED = 'execution.started' +TICK = 'execution.tick' +STOP = 'execution.stop' +STOPPED = 'execution.stopped' +KILL = 'execution.kill' + + +class ExecutionEvent(Event): + def __init__(self, graph_context): + self.graph_context = graph_context diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 77e01fa..deaa150 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -1,8 +1,11 @@ -import time from functools import partial +from time import sleep + +from whistle import EventDispatcher from bonobo.config import create_container from bonobo.constants import BEGIN, END +from bonobo.execution import events from bonobo.execution.node import NodeExecutionContext from bonobo.execution.plugin import PluginExecutionContext @@ -11,6 +14,8 @@ class GraphExecutionContext: NodeExecutionContextType = NodeExecutionContext PluginExecutionContextType = PluginExecutionContext + TICK_PERIOD = 0.25 + @property def started(self): return any(node.started for node in self.nodes) @@ -23,7 +28,8 @@ class GraphExecutionContext: def alive(self): return any(node.alive for node in self.nodes) - def __init__(self, graph, plugins=None, services=None): + def __init__(self, graph, plugins=None, services=None, dispatcher=None): + self.dispatcher = dispatcher or EventDispatcher() self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()] @@ -53,6 +59,8 @@ class GraphExecutionContext: return self.NodeExecutionContextType(node, parent=self) def create_plugin_execution_context_for(self, plugin): + if isinstance(plugin, type): + plugin = plugin() return self.PluginExecutionContextType(plugin, parent=self) def write(self, *messages): @@ -63,23 +71,45 @@ class GraphExecutionContext: for message in messages: self[i].write(message) + def dispatch(self, name): + self.dispatcher.dispatch(name, events.ExecutionEvent(self)) + def start(self, starter=None): + self.register_plugins() + self.dispatch(events.START) + self.tick() for node in self.nodes: if starter is None: node.start() else: starter(node) + self.dispatch(events.STARTED) - def start_plugins(self, starter=None): - for plugin in self.plugins: - if starter is None: - plugin.start() - else: - starter(plugin) + def tick(self): + self.dispatch(events.TICK) + sleep(self.TICK_PERIOD) + + def kill(self): + self.dispatch(events.KILL) + for node_context in self.nodes: + node_context.kill() + self.tick() def stop(self, stopper=None): - for node in self.nodes: + self.dispatch(events.STOP) + for node_context in self.nodes: if stopper is None: - node.stop() + node_context.stop() else: - stopper(node) + stopper(node_context) + self.tick() + self.dispatch(events.STOPPED) + self.unregister_plugins() + + def register_plugins(self): + for plugin_context in self.plugins: + plugin_context.register() + + def unregister_plugins(self): + for plugin_context in self.plugins: + plugin_context.unregister() diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 7781a78..fdb0c9f 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -1,10 +1,9 @@ -import traceback +import sys +import threading from queue import Empty from time import sleep from types import GeneratorType -import sys - from bonobo.constants import NOT_MODIFIED, BEGIN, END from bonobo.errors import InactiveReadableError, UnrecoverableError from bonobo.execution.base import LoopingExecutionContext @@ -22,13 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """ @property - def alive(self): - """todo check if this is right, and where it is used""" - return self._started and not self._stopped - - @property - def alive_str(self): - return '+' if self.alive else '-' + def killed(self): + return self._killed def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None): LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) @@ -36,13 +30,19 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): self.input = _input or Input() self.outputs = _outputs or [] + self._killed = False def __str__(self): - return self.alive_str + ' ' + self.__name__ + self.get_statistics_as_string(prefix=' ') + return self.__name__ + self.get_statistics_as_string(prefix=' ') def __repr__(self): name, type_name = get_name(self), get_name(type(self)) - return '<{}({}{}){}>'.format(type_name, self.alive_str, name, self.get_statistics_as_string(prefix=' ')) + return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' ')) + + def get_flags_as_string(self): + if self.killed: + return '[killed]' + return '' def write(self, *messages): """ @@ -92,22 +92,26 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): return row def loop(self): - while True: + while not self._killed: try: self.step() except KeyboardInterrupt: - raise + self.handle_error(*sys.exc_info()) + break except InactiveReadableError: break except Empty: sleep(self.PERIOD) continue - except UnrecoverableError as exc: + except UnrecoverableError: self.handle_error(*sys.exc_info()) self.input.shutdown() break - except Exception as exc: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except self.handle_error(*sys.exc_info()) + except BaseException: + self.handle_error(*sys.exc_info()) + break def step(self): # Pull data from the first available input channel. @@ -119,6 +123,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # todo add timer self.handle_results(input_bag, input_bag.apply(self._stack)) + def kill(self): + if not self.started: + raise RuntimeError('Cannot kill a node context that has not started yet.') + + if self.stopped: + raise RuntimeError('Cannot kill a node context that has already stopped.') + + self._killed = True + def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -126,6 +139,9 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): if isinstance(results, GeneratorType): while True: try: + # if kill flag was step, stop iterating. + if self._killed: + break result = next(results) except StopIteration: break @@ -140,7 +156,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): def isflag(param): - return isinstance(param, Token) and param in (NOT_MODIFIED, ) + return isinstance(param, Token) and param in (NOT_MODIFIED,) def split_tokens(output): @@ -152,11 +168,11 @@ def split_tokens(output): """ if isinstance(output, Token): # just a flag - return (output, ), () + return (output,), () if not istuple(output): # no flag - return (), (output, ) + return (), (output,) i = 0 while isflag(output[i]): diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py index 3379fc0..f552724 100644 --- a/bonobo/execution/plugin.py +++ b/bonobo/execution/plugin.py @@ -2,25 +2,12 @@ from bonobo.execution.base import LoopingExecutionContext, recoverable class PluginExecutionContext(LoopingExecutionContext): - PERIOD = 0.5 + @property + def dispatcher(self): + return self.parent.dispatcher - def __init__(self, wrapped, parent): - # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure - # plugins, for example if it depends on an external service. - super().__init__(wrapped(self), parent) + def register(self): + return self.wrapped.register(self.dispatcher) - def start(self): - super().start() - - with recoverable(self.handle_error): - self.wrapped.on_start() - - def shutdown(self): - if self.started: - with recoverable(self.handle_error): - self.wrapped.on_stop() - self.alive = False - - def step(self): - with recoverable(self.handle_error): - self.wrapped.on_tick() + def unregister(self): + return self.wrapped.unregister(self.dispatcher) diff --git a/bonobo/ext/django.py b/bonobo/ext/django.py index d9d17f7..60b583c 100644 --- a/bonobo/ext/django.py +++ b/bonobo/ext/django.py @@ -5,7 +5,7 @@ from django.core.management.base import BaseCommand, OutputWrapper import bonobo import bonobo.util -from bonobo.ext.console import ConsoleOutputPlugin +from bonobo.plugins.console import ConsoleOutputPlugin from bonobo.util.term import CLEAR_EOL diff --git a/bonobo/plugins.py b/bonobo/plugins/__init__.py similarity index 67% rename from bonobo/plugins.py rename to bonobo/plugins/__init__.py index 7a0f5d1..897b687 100644 --- a/bonobo/plugins.py +++ b/bonobo/plugins/__init__.py @@ -10,5 +10,14 @@ class Plugin: """ - def __init__(self, context): - self.context = context + def register(self, dispatcher): + """ + :param dispatcher: whistle.EventDispatcher + """ + pass + + def unregister(self, dispatcher): + """ + :param dispatcher: whistle.EventDispatcher + """ + pass diff --git a/bonobo/ext/console.py b/bonobo/plugins/console.py similarity index 83% rename from bonobo/ext/console.py rename to bonobo/plugins/console.py index 0e6abb3..814894b 100644 --- a/bonobo/ext/console.py +++ b/bonobo/plugins/console.py @@ -2,38 +2,14 @@ import io import sys from contextlib import redirect_stdout, redirect_stderr -from colorama import Style, Fore, init - -init(wrap=True) +from colorama import Style, Fore, init as initialize_colorama_output_wrappers from bonobo import settings +from bonobo.execution import events from bonobo.plugins import Plugin from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP - -class IOBuffer(): - """ - The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It - works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from. - On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active". - - """ - - def __init__(self): - self.current = io.StringIO() - self.write = self.current.write - - def switch(self): - previous = self.current - self.current = io.StringIO() - self.write = self.current.write - try: - return previous.getvalue() - finally: - previous.close() - - def flush(self): - self.current.flush() +initialize_colorama_output_wrappers(wrap=True) class ConsoleOutputPlugin(Plugin): @@ -60,13 +36,24 @@ class ConsoleOutputPlugin(Plugin): # Whether we're on windows, or a real operating system. iswindows = (sys.platform == 'win32') - def on_start(self): + def __init__(self): + self.isatty = self._stdout.isatty() + + def register(self, dispatcher): + dispatcher.add_listener(events.START, self.setup) + dispatcher.add_listener(events.TICK, self.tick) + dispatcher.add_listener(events.STOPPED, self.teardown) + + def unregister(self, dispatcher): + dispatcher.remove_listener(events.STOPPED, self.teardown) + dispatcher.remove_listener(events.TICK, self.tick) + dispatcher.remove_listener(events.START, self.setup) + + def setup(self, event): self.prefix = '' self.counter = 0 self._append_cache = '' - self.isatty = self._stdout.isatty() - self.stdout = IOBuffer() self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout) self.redirect_stdout.__enter__() @@ -75,14 +62,14 @@ class ConsoleOutputPlugin(Plugin): self.redirect_stderr = redirect_stderr(self._stderr if self.iswindows else self.stderr) self.redirect_stderr.__enter__() - def on_tick(self): + def tick(self, event): if self.isatty and not self.iswindows: - self._write(self.context.parent, rewind=True) + self._write(event.graph_context, rewind=True) else: pass # not a tty, or windows, so we'll ignore stats output - def on_stop(self): - self._write(self.context.parent, rewind=False) + def teardown(self, event): + self._write(event.graph_context, rewind=False) self.redirect_stderr.__exit__(None, None, None) self.redirect_stdout.__exit__(None, None, None) @@ -113,6 +100,8 @@ class ConsoleOutputPlugin(Plugin): name_suffix, ' ', node.get_statistics_as_string(), + ' ', + node.get_flags_as_string(), Style.RESET_ALL, ' ', ) @@ -128,6 +117,8 @@ class ConsoleOutputPlugin(Plugin): name_suffix, ' ', node.get_statistics_as_string(), + ' ', + node.get_flags_as_string(), Style.RESET_ALL, ' ', ) @@ -166,7 +157,32 @@ class ConsoleOutputPlugin(Plugin): self.counter += 1 +class IOBuffer(): + """ + The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It + works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from. + On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active". + + """ + + def __init__(self): + self.current = io.StringIO() + self.write = self.current.write + + def switch(self): + previous = self.current + self.current = io.StringIO() + self.write = self.current.write + try: + return previous.getvalue() + finally: + previous.close() + + def flush(self): + self.current.flush() + + def memory_usage(): import os, psutil process = psutil.Process(os.getpid()) - return process.memory_info()[0] / float(2**20) + return process.memory_info()[0] / float(2 ** 20) diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index 8c27d40..e5ffdc0 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -1,10 +1,8 @@ -import time - +import functools +import logging import sys - -import mondrian -import traceback -from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor, wait, FIRST_EXCEPTION +from time import sleep from bonobo.util import get_name from bonobo.constants import BEGIN, END @@ -27,60 +25,38 @@ class ExecutorStrategy(Strategy): context = self.create_graph_execution_context(graph, **kwargs) context.write(BEGIN, Bag(), END) - executor = self.create_executor() - futures = [] - context.start_plugins(self.get_plugin_starter(executor, futures)) - context.start(self.get_starter(executor, futures)) + with self.create_executor() as executor: + context.start(self.get_starter(executor, futures)) - while context.alive: - time.sleep(0.1) + while context.alive: + try: + context.tick() + except KeyboardInterrupt: + logging.getLogger(__name__).warning('KeyboardInterrupt received. Trying to terminate the nodes gracefully.') + context.kill() + break - for plugin_context in context.plugins: - plugin_context.shutdown() - - context.stop() - - executor.shutdown() + context.stop() return context def get_starter(self, executor, futures): def starter(node): + @functools.wraps(node) def _runner(): try: - node.start() - except Exception: - mondrian.excepthook(*sys.exc_info(), context='Could not start node {}.'.format(get_name(node))) - node.input.on_end() - else: - node.loop() - - try: - node.stop() - except Exception: - mondrian.excepthook(*sys.exc_info(), context='Could not stop node {}.'.format(get_name(node))) + with node: + node.loop() + except BaseException as exc: + logging.getLogger(__name__).info('Got {} in {} runner.'.format(get_name(exc), node), + exc_info=sys.exc_info()) futures.append(executor.submit(_runner)) return starter - def get_plugin_starter(self, executor, futures): - def plugin_starter(plugin): - def _runner(): - with plugin: - try: - plugin.loop() - except Exception: - mondrian.excepthook( - *sys.exc_info(), context='In plugin loop for {}...'.format(get_name(plugin)) - ) - - futures.append(executor.submit(_runner)) - - return plugin_starter - class ThreadPoolExecutorStrategy(ExecutorStrategy): executor_factory = ThreadPoolExecutor diff --git a/bonobo/strategies/util.py b/bonobo/strategies/util.py deleted file mode 100644 index 8b13789..0000000 --- a/bonobo/strategies/util.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/requirements.txt b/requirements.txt index 82f5cdb..f579bdc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ fs==2.0.12 idna==2.6 jinja2==2.9.6 markupsafe==1.0 -mondrian==0.4a0 +mondrian==0.3.0 packaging==16.8 pbr==3.1.1 psutil==5.4.0 @@ -17,3 +17,4 @@ requests==2.18.4 six==1.11.0 stevedore==1.27.1 urllib3==1.22 +whistle==1.0a3 diff --git a/setup.py b/setup.py index 219e0e6..b914b72 100644 --- a/setup.py +++ b/setup.py @@ -53,8 +53,8 @@ setup( packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, install_requires=[ - 'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian (== 0.4a0)', - 'packaging (>= 16, < 17)', 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)' + 'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian', 'packaging (>= 16, < 17)', + 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)', 'whistle (== 1.0a3)' ], extras_require={ 'dev': [