diff --git a/Makefile b/Makefile index 8baafb1..483467d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Generated by Medikit 0.4a10 on 2017-11-02. +# Generated by Medikit 0.4a10 on 2017-11-03. # All changes will be overriden. PACKAGE ?= bonobo diff --git a/Projectfile b/Projectfile index a91bfe5..7aa05b5 100644 --- a/Projectfile +++ b/Projectfile @@ -42,11 +42,12 @@ python.setup( python.add_requirements( 'fs >=2.0,<2.1', 'jinja2 >=2.9,<2.10', - 'mondrian ==0.4a0', + 'mondrian ==0.4a1', 'packaging >=16,<17', 'psutil >=5.4,<6.0', 'requests >=2.0,<3.0', 'stevedore >=1.27,<1.28', + 'whistle ==1.0a3', dev=[ 'pytest-sugar >=0.8,<0.9', 'pytest-timeout >=1,<2', diff --git a/bonobo/_api.py b/bonobo/_api.py index d4dbda2..9a82f41 100644 --- a/bonobo/_api.py +++ b/bonobo/_api.py @@ -49,10 +49,9 @@ def run(graph, *, plugins=None, services=None, strategy=None): if not settings.QUIET.get(): # pragma: no cover if _is_interactive_console(): import mondrian - mondrian.setup() - mondrian.setupExceptHook() + mondrian.setup(excepthook=True) - from bonobo.ext.console import ConsoleOutputPlugin + from bonobo.plugins.console import ConsoleOutputPlugin if ConsoleOutputPlugin not in plugins: plugins.append(ConsoleOutputPlugin) @@ -70,7 +69,10 @@ def run(graph, *, plugins=None, services=None, strategy=None): if JupyterOutputPlugin not in plugins: plugins.append(JupyterOutputPlugin) - return create_strategy(strategy).execute(graph, plugins=plugins, services=services) + import logging + logging.getLogger().setLevel(settings.LOGGING_LEVEL.get()) + strategy = create_strategy(strategy) + return strategy.execute(graph, plugins=plugins, services=services) def _inspect_as_graph(graph): diff --git a/bonobo/commands/__init__.py b/bonobo/commands/__init__.py index 39cfa05..f42d5c6 100644 --- a/bonobo/commands/__init__.py +++ b/bonobo/commands/__init__.py @@ -14,9 +14,7 @@ def entrypoint(args=None): """ - mondrian.setup() - mondrian.setupExceptHook() - + mondrian.setup(excepthook=True) logger = logging.getLogger() parser = argparse.ArgumentParser() diff --git a/bonobo/examples/clock.py b/bonobo/examples/clock.py new file mode 100644 index 0000000..765f077 --- /dev/null +++ b/bonobo/examples/clock.py @@ -0,0 +1,26 @@ +import bonobo +import datetime +import time + + +def extract(): + """Placeholder, change, rename, remove... """ + for x in range(60): + if x: + time.sleep(1) + yield datetime.datetime.now() + + +def get_graph(): + graph = bonobo.Graph() + graph.add_chain( + extract, + print, + ) + + return graph + +if __name__ == '__main__': + parser = bonobo.get_argument_parser() + with bonobo.parse_args(parser): + bonobo.run(get_graph()) diff --git a/bonobo/execution/base.py b/bonobo/execution/base.py index 08375a8..74dd89b 100644 --- a/bonobo/execution/base.py +++ b/bonobo/execution/base.py @@ -29,8 +29,7 @@ def unrecoverable(error_handler): class LoopingExecutionContext(Wrapper): - alive = True - PERIOD = 0.25 + PERIOD = 0.5 @property def started(self): @@ -40,6 +39,19 @@ class LoopingExecutionContext(Wrapper): def stopped(self): return self._stopped + @property + def alive(self): + return self._started and not self._stopped + + @property + def status(self): + """One character status for this node. """ + if not self.started: + return ' ' + if not self.stopped: + return '+' + return '-' + def __init__(self, wrapped, parent, services=None): super().__init__(wrapped) @@ -84,7 +96,6 @@ class LoopingExecutionContext(Wrapper): """Generic loop. A bit boring. """ while self.alive: self.step() - sleep(self.PERIOD) def step(self): """Left as an exercise for the children.""" diff --git a/bonobo/execution/events.py b/bonobo/execution/events.py new file mode 100644 index 0000000..036e879 --- /dev/null +++ b/bonobo/execution/events.py @@ -0,0 +1,13 @@ +from whistle import Event + +START = 'execution.start' +STARTED = 'execution.started' +TICK = 'execution.tick' +STOP = 'execution.stop' +STOPPED = 'execution.stopped' +KILL = 'execution.kill' + + +class ExecutionEvent(Event): + def __init__(self, graph_context): + self.graph_context = graph_context diff --git a/bonobo/execution/graph.py b/bonobo/execution/graph.py index 77e01fa..deaa150 100644 --- a/bonobo/execution/graph.py +++ b/bonobo/execution/graph.py @@ -1,8 +1,11 @@ -import time from functools import partial +from time import sleep + +from whistle import EventDispatcher from bonobo.config import create_container from bonobo.constants import BEGIN, END +from bonobo.execution import events from bonobo.execution.node import NodeExecutionContext from bonobo.execution.plugin import PluginExecutionContext @@ -11,6 +14,8 @@ class GraphExecutionContext: NodeExecutionContextType = NodeExecutionContext PluginExecutionContextType = PluginExecutionContext + TICK_PERIOD = 0.25 + @property def started(self): return any(node.started for node in self.nodes) @@ -23,7 +28,8 @@ class GraphExecutionContext: def alive(self): return any(node.alive for node in self.nodes) - def __init__(self, graph, plugins=None, services=None): + def __init__(self, graph, plugins=None, services=None, dispatcher=None): + self.dispatcher = dispatcher or EventDispatcher() self.graph = graph self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()] @@ -53,6 +59,8 @@ class GraphExecutionContext: return self.NodeExecutionContextType(node, parent=self) def create_plugin_execution_context_for(self, plugin): + if isinstance(plugin, type): + plugin = plugin() return self.PluginExecutionContextType(plugin, parent=self) def write(self, *messages): @@ -63,23 +71,45 @@ class GraphExecutionContext: for message in messages: self[i].write(message) + def dispatch(self, name): + self.dispatcher.dispatch(name, events.ExecutionEvent(self)) + def start(self, starter=None): + self.register_plugins() + self.dispatch(events.START) + self.tick() for node in self.nodes: if starter is None: node.start() else: starter(node) + self.dispatch(events.STARTED) - def start_plugins(self, starter=None): - for plugin in self.plugins: - if starter is None: - plugin.start() - else: - starter(plugin) + def tick(self): + self.dispatch(events.TICK) + sleep(self.TICK_PERIOD) + + def kill(self): + self.dispatch(events.KILL) + for node_context in self.nodes: + node_context.kill() + self.tick() def stop(self, stopper=None): - for node in self.nodes: + self.dispatch(events.STOP) + for node_context in self.nodes: if stopper is None: - node.stop() + node_context.stop() else: - stopper(node) + stopper(node_context) + self.tick() + self.dispatch(events.STOPPED) + self.unregister_plugins() + + def register_plugins(self): + for plugin_context in self.plugins: + plugin_context.register() + + def unregister_plugins(self): + for plugin_context in self.plugins: + plugin_context.unregister() diff --git a/bonobo/execution/node.py b/bonobo/execution/node.py index 7781a78..fdb0c9f 100644 --- a/bonobo/execution/node.py +++ b/bonobo/execution/node.py @@ -1,10 +1,9 @@ -import traceback +import sys +import threading from queue import Empty from time import sleep from types import GeneratorType -import sys - from bonobo.constants import NOT_MODIFIED, BEGIN, END from bonobo.errors import InactiveReadableError, UnrecoverableError from bonobo.execution.base import LoopingExecutionContext @@ -22,13 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): """ @property - def alive(self): - """todo check if this is right, and where it is used""" - return self._started and not self._stopped - - @property - def alive_str(self): - return '+' if self.alive else '-' + def killed(self): + return self._killed def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None): LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) @@ -36,13 +30,19 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): self.input = _input or Input() self.outputs = _outputs or [] + self._killed = False def __str__(self): - return self.alive_str + ' ' + self.__name__ + self.get_statistics_as_string(prefix=' ') + return self.__name__ + self.get_statistics_as_string(prefix=' ') def __repr__(self): name, type_name = get_name(self), get_name(type(self)) - return '<{}({}{}){}>'.format(type_name, self.alive_str, name, self.get_statistics_as_string(prefix=' ')) + return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' ')) + + def get_flags_as_string(self): + if self.killed: + return '[killed]' + return '' def write(self, *messages): """ @@ -92,22 +92,26 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): return row def loop(self): - while True: + while not self._killed: try: self.step() except KeyboardInterrupt: - raise + self.handle_error(*sys.exc_info()) + break except InactiveReadableError: break except Empty: sleep(self.PERIOD) continue - except UnrecoverableError as exc: + except UnrecoverableError: self.handle_error(*sys.exc_info()) self.input.shutdown() break - except Exception as exc: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except self.handle_error(*sys.exc_info()) + except BaseException: + self.handle_error(*sys.exc_info()) + break def step(self): # Pull data from the first available input channel. @@ -119,6 +123,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): # todo add timer self.handle_results(input_bag, input_bag.apply(self._stack)) + def kill(self): + if not self.started: + raise RuntimeError('Cannot kill a node context that has not started yet.') + + if self.stopped: + raise RuntimeError('Cannot kill a node context that has already stopped.') + + self._killed = True + def handle_results(self, input_bag, results): # self._exec_time += timer.duration # Put data onto output channels @@ -126,6 +139,9 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): if isinstance(results, GeneratorType): while True: try: + # if kill flag was step, stop iterating. + if self._killed: + break result = next(results) except StopIteration: break @@ -140,7 +156,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext): def isflag(param): - return isinstance(param, Token) and param in (NOT_MODIFIED, ) + return isinstance(param, Token) and param in (NOT_MODIFIED,) def split_tokens(output): @@ -152,11 +168,11 @@ def split_tokens(output): """ if isinstance(output, Token): # just a flag - return (output, ), () + return (output,), () if not istuple(output): # no flag - return (), (output, ) + return (), (output,) i = 0 while isflag(output[i]): diff --git a/bonobo/execution/plugin.py b/bonobo/execution/plugin.py index 3379fc0..f552724 100644 --- a/bonobo/execution/plugin.py +++ b/bonobo/execution/plugin.py @@ -2,25 +2,12 @@ from bonobo.execution.base import LoopingExecutionContext, recoverable class PluginExecutionContext(LoopingExecutionContext): - PERIOD = 0.5 + @property + def dispatcher(self): + return self.parent.dispatcher - def __init__(self, wrapped, parent): - # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure - # plugins, for example if it depends on an external service. - super().__init__(wrapped(self), parent) + def register(self): + return self.wrapped.register(self.dispatcher) - def start(self): - super().start() - - with recoverable(self.handle_error): - self.wrapped.on_start() - - def shutdown(self): - if self.started: - with recoverable(self.handle_error): - self.wrapped.on_stop() - self.alive = False - - def step(self): - with recoverable(self.handle_error): - self.wrapped.on_tick() + def unregister(self): + return self.wrapped.unregister(self.dispatcher) diff --git a/bonobo/ext/django.py b/bonobo/ext/django.py index d9d17f7..60b583c 100644 --- a/bonobo/ext/django.py +++ b/bonobo/ext/django.py @@ -5,7 +5,7 @@ from django.core.management.base import BaseCommand, OutputWrapper import bonobo import bonobo.util -from bonobo.ext.console import ConsoleOutputPlugin +from bonobo.plugins.console import ConsoleOutputPlugin from bonobo.util.term import CLEAR_EOL diff --git a/bonobo/plugins.py b/bonobo/plugins/__init__.py similarity index 67% rename from bonobo/plugins.py rename to bonobo/plugins/__init__.py index 7a0f5d1..897b687 100644 --- a/bonobo/plugins.py +++ b/bonobo/plugins/__init__.py @@ -10,5 +10,14 @@ class Plugin: """ - def __init__(self, context): - self.context = context + def register(self, dispatcher): + """ + :param dispatcher: whistle.EventDispatcher + """ + pass + + def unregister(self, dispatcher): + """ + :param dispatcher: whistle.EventDispatcher + """ + pass diff --git a/bonobo/ext/console.py b/bonobo/plugins/console.py similarity index 83% rename from bonobo/ext/console.py rename to bonobo/plugins/console.py index 0e6abb3..814894b 100644 --- a/bonobo/ext/console.py +++ b/bonobo/plugins/console.py @@ -2,38 +2,14 @@ import io import sys from contextlib import redirect_stdout, redirect_stderr -from colorama import Style, Fore, init - -init(wrap=True) +from colorama import Style, Fore, init as initialize_colorama_output_wrappers from bonobo import settings +from bonobo.execution import events from bonobo.plugins import Plugin from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP - -class IOBuffer(): - """ - The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It - works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from. - On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active". - - """ - - def __init__(self): - self.current = io.StringIO() - self.write = self.current.write - - def switch(self): - previous = self.current - self.current = io.StringIO() - self.write = self.current.write - try: - return previous.getvalue() - finally: - previous.close() - - def flush(self): - self.current.flush() +initialize_colorama_output_wrappers(wrap=True) class ConsoleOutputPlugin(Plugin): @@ -60,13 +36,24 @@ class ConsoleOutputPlugin(Plugin): # Whether we're on windows, or a real operating system. iswindows = (sys.platform == 'win32') - def on_start(self): + def __init__(self): + self.isatty = self._stdout.isatty() + + def register(self, dispatcher): + dispatcher.add_listener(events.START, self.setup) + dispatcher.add_listener(events.TICK, self.tick) + dispatcher.add_listener(events.STOPPED, self.teardown) + + def unregister(self, dispatcher): + dispatcher.remove_listener(events.STOPPED, self.teardown) + dispatcher.remove_listener(events.TICK, self.tick) + dispatcher.remove_listener(events.START, self.setup) + + def setup(self, event): self.prefix = '' self.counter = 0 self._append_cache = '' - self.isatty = self._stdout.isatty() - self.stdout = IOBuffer() self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout) self.redirect_stdout.__enter__() @@ -75,14 +62,14 @@ class ConsoleOutputPlugin(Plugin): self.redirect_stderr = redirect_stderr(self._stderr if self.iswindows else self.stderr) self.redirect_stderr.__enter__() - def on_tick(self): + def tick(self, event): if self.isatty and not self.iswindows: - self._write(self.context.parent, rewind=True) + self._write(event.graph_context, rewind=True) else: pass # not a tty, or windows, so we'll ignore stats output - def on_stop(self): - self._write(self.context.parent, rewind=False) + def teardown(self, event): + self._write(event.graph_context, rewind=False) self.redirect_stderr.__exit__(None, None, None) self.redirect_stdout.__exit__(None, None, None) @@ -113,6 +100,8 @@ class ConsoleOutputPlugin(Plugin): name_suffix, ' ', node.get_statistics_as_string(), + ' ', + node.get_flags_as_string(), Style.RESET_ALL, ' ', ) @@ -128,6 +117,8 @@ class ConsoleOutputPlugin(Plugin): name_suffix, ' ', node.get_statistics_as_string(), + ' ', + node.get_flags_as_string(), Style.RESET_ALL, ' ', ) @@ -166,7 +157,32 @@ class ConsoleOutputPlugin(Plugin): self.counter += 1 +class IOBuffer(): + """ + The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It + works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from. + On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active". + + """ + + def __init__(self): + self.current = io.StringIO() + self.write = self.current.write + + def switch(self): + previous = self.current + self.current = io.StringIO() + self.write = self.current.write + try: + return previous.getvalue() + finally: + previous.close() + + def flush(self): + self.current.flush() + + def memory_usage(): import os, psutil process = psutil.Process(os.getpid()) - return process.memory_info()[0] / float(2**20) + return process.memory_info()[0] / float(2 ** 20) diff --git a/bonobo/strategies/executor.py b/bonobo/strategies/executor.py index 8c27d40..e5ffdc0 100644 --- a/bonobo/strategies/executor.py +++ b/bonobo/strategies/executor.py @@ -1,10 +1,8 @@ -import time - +import functools +import logging import sys - -import mondrian -import traceback -from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor, wait, FIRST_EXCEPTION +from time import sleep from bonobo.util import get_name from bonobo.constants import BEGIN, END @@ -27,60 +25,38 @@ class ExecutorStrategy(Strategy): context = self.create_graph_execution_context(graph, **kwargs) context.write(BEGIN, Bag(), END) - executor = self.create_executor() - futures = [] - context.start_plugins(self.get_plugin_starter(executor, futures)) - context.start(self.get_starter(executor, futures)) + with self.create_executor() as executor: + context.start(self.get_starter(executor, futures)) - while context.alive: - time.sleep(0.1) + while context.alive: + try: + context.tick() + except KeyboardInterrupt: + logging.getLogger(__name__).warning('KeyboardInterrupt received. Trying to terminate the nodes gracefully.') + context.kill() + break - for plugin_context in context.plugins: - plugin_context.shutdown() - - context.stop() - - executor.shutdown() + context.stop() return context def get_starter(self, executor, futures): def starter(node): + @functools.wraps(node) def _runner(): try: - node.start() - except Exception: - mondrian.excepthook(*sys.exc_info(), context='Could not start node {}.'.format(get_name(node))) - node.input.on_end() - else: - node.loop() - - try: - node.stop() - except Exception: - mondrian.excepthook(*sys.exc_info(), context='Could not stop node {}.'.format(get_name(node))) + with node: + node.loop() + except BaseException as exc: + logging.getLogger(__name__).info('Got {} in {} runner.'.format(get_name(exc), node), + exc_info=sys.exc_info()) futures.append(executor.submit(_runner)) return starter - def get_plugin_starter(self, executor, futures): - def plugin_starter(plugin): - def _runner(): - with plugin: - try: - plugin.loop() - except Exception: - mondrian.excepthook( - *sys.exc_info(), context='In plugin loop for {}...'.format(get_name(plugin)) - ) - - futures.append(executor.submit(_runner)) - - return plugin_starter - class ThreadPoolExecutorStrategy(ExecutorStrategy): executor_factory = ThreadPoolExecutor diff --git a/bonobo/strategies/util.py b/bonobo/strategies/util.py deleted file mode 100644 index 8b13789..0000000 --- a/bonobo/strategies/util.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/requirements.txt b/requirements.txt index 82f5cdb..f579bdc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ fs==2.0.12 idna==2.6 jinja2==2.9.6 markupsafe==1.0 -mondrian==0.4a0 +mondrian==0.3.0 packaging==16.8 pbr==3.1.1 psutil==5.4.0 @@ -17,3 +17,4 @@ requests==2.18.4 six==1.11.0 stevedore==1.27.1 urllib3==1.22 +whistle==1.0a3 diff --git a/setup.py b/setup.py index 219e0e6..b914b72 100644 --- a/setup.py +++ b/setup.py @@ -53,8 +53,8 @@ setup( packages=find_packages(exclude=['ez_setup', 'example', 'test']), include_package_data=True, install_requires=[ - 'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian (== 0.4a0)', - 'packaging (>= 16, < 17)', 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)' + 'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian', 'packaging (>= 16, < 17)', + 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)', 'whistle (== 1.0a3)' ], extras_require={ 'dev': [