[core] Refactoring to use an event dispatcher in the main thread.

Plugins now run in the main thread, instead of their own threads, and
the API changed to use an event dispatcher approach instead of a static
class interface.
This commit is contained in:
Romain Dorgueil
2017-11-04 11:20:15 +01:00
parent d988d30474
commit 6bd1130e34
17 changed files with 233 additions and 148 deletions

View File

@ -1,4 +1,4 @@
# Generated by Medikit 0.4a10 on 2017-11-02. # Generated by Medikit 0.4a10 on 2017-11-03.
# All changes will be overriden. # All changes will be overriden.
PACKAGE ?= bonobo PACKAGE ?= bonobo

View File

@ -42,11 +42,12 @@ python.setup(
python.add_requirements( python.add_requirements(
'fs >=2.0,<2.1', 'fs >=2.0,<2.1',
'jinja2 >=2.9,<2.10', 'jinja2 >=2.9,<2.10',
'mondrian ==0.4a0', 'mondrian ==0.4a1',
'packaging >=16,<17', 'packaging >=16,<17',
'psutil >=5.4,<6.0', 'psutil >=5.4,<6.0',
'requests >=2.0,<3.0', 'requests >=2.0,<3.0',
'stevedore >=1.27,<1.28', 'stevedore >=1.27,<1.28',
'whistle ==1.0a3',
dev=[ dev=[
'pytest-sugar >=0.8,<0.9', 'pytest-sugar >=0.8,<0.9',
'pytest-timeout >=1,<2', 'pytest-timeout >=1,<2',

View File

@ -49,10 +49,9 @@ def run(graph, *, plugins=None, services=None, strategy=None):
if not settings.QUIET.get(): # pragma: no cover if not settings.QUIET.get(): # pragma: no cover
if _is_interactive_console(): if _is_interactive_console():
import mondrian import mondrian
mondrian.setup() mondrian.setup(excepthook=True)
mondrian.setupExceptHook()
from bonobo.ext.console import ConsoleOutputPlugin from bonobo.plugins.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins: if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin) plugins.append(ConsoleOutputPlugin)
@ -70,7 +69,10 @@ def run(graph, *, plugins=None, services=None, strategy=None):
if JupyterOutputPlugin not in plugins: if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin) plugins.append(JupyterOutputPlugin)
return create_strategy(strategy).execute(graph, plugins=plugins, services=services) import logging
logging.getLogger().setLevel(settings.LOGGING_LEVEL.get())
strategy = create_strategy(strategy)
return strategy.execute(graph, plugins=plugins, services=services)
def _inspect_as_graph(graph): def _inspect_as_graph(graph):

View File

@ -14,9 +14,7 @@ def entrypoint(args=None):
""" """
mondrian.setup() mondrian.setup(excepthook=True)
mondrian.setupExceptHook()
logger = logging.getLogger() logger = logging.getLogger()
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

26
bonobo/examples/clock.py Normal file
View File

@ -0,0 +1,26 @@
import bonobo
import datetime
import time
def extract():
"""Placeholder, change, rename, remove... """
for x in range(60):
if x:
time.sleep(1)
yield datetime.datetime.now()
def get_graph():
graph = bonobo.Graph()
graph.add_chain(
extract,
print,
)
return graph
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser):
bonobo.run(get_graph())

View File

@ -29,8 +29,7 @@ def unrecoverable(error_handler):
class LoopingExecutionContext(Wrapper): class LoopingExecutionContext(Wrapper):
alive = True PERIOD = 0.5
PERIOD = 0.25
@property @property
def started(self): def started(self):
@ -40,6 +39,19 @@ class LoopingExecutionContext(Wrapper):
def stopped(self): def stopped(self):
return self._stopped return self._stopped
@property
def alive(self):
return self._started and not self._stopped
@property
def status(self):
"""One character status for this node. """
if not self.started:
return ' '
if not self.stopped:
return '+'
return '-'
def __init__(self, wrapped, parent, services=None): def __init__(self, wrapped, parent, services=None):
super().__init__(wrapped) super().__init__(wrapped)
@ -84,7 +96,6 @@ class LoopingExecutionContext(Wrapper):
"""Generic loop. A bit boring. """ """Generic loop. A bit boring. """
while self.alive: while self.alive:
self.step() self.step()
sleep(self.PERIOD)
def step(self): def step(self):
"""Left as an exercise for the children.""" """Left as an exercise for the children."""

View File

@ -0,0 +1,13 @@
from whistle import Event
START = 'execution.start'
STARTED = 'execution.started'
TICK = 'execution.tick'
STOP = 'execution.stop'
STOPPED = 'execution.stopped'
KILL = 'execution.kill'
class ExecutionEvent(Event):
def __init__(self, graph_context):
self.graph_context = graph_context

View File

@ -1,8 +1,11 @@
import time
from functools import partial from functools import partial
from time import sleep
from whistle import EventDispatcher
from bonobo.config import create_container from bonobo.config import create_container
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
from bonobo.execution import events
from bonobo.execution.node import NodeExecutionContext from bonobo.execution.node import NodeExecutionContext
from bonobo.execution.plugin import PluginExecutionContext from bonobo.execution.plugin import PluginExecutionContext
@ -11,6 +14,8 @@ class GraphExecutionContext:
NodeExecutionContextType = NodeExecutionContext NodeExecutionContextType = NodeExecutionContext
PluginExecutionContextType = PluginExecutionContext PluginExecutionContextType = PluginExecutionContext
TICK_PERIOD = 0.25
@property @property
def started(self): def started(self):
return any(node.started for node in self.nodes) return any(node.started for node in self.nodes)
@ -23,7 +28,8 @@ class GraphExecutionContext:
def alive(self): def alive(self):
return any(node.alive for node in self.nodes) return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None): def __init__(self, graph, plugins=None, services=None, dispatcher=None):
self.dispatcher = dispatcher or EventDispatcher()
self.graph = graph self.graph = graph
self.nodes = [self.create_node_execution_context_for(node) for node in self.graph] self.nodes = [self.create_node_execution_context_for(node) for node in self.graph]
self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()] self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()]
@ -53,6 +59,8 @@ class GraphExecutionContext:
return self.NodeExecutionContextType(node, parent=self) return self.NodeExecutionContextType(node, parent=self)
def create_plugin_execution_context_for(self, plugin): def create_plugin_execution_context_for(self, plugin):
if isinstance(plugin, type):
plugin = plugin()
return self.PluginExecutionContextType(plugin, parent=self) return self.PluginExecutionContextType(plugin, parent=self)
def write(self, *messages): def write(self, *messages):
@ -63,23 +71,45 @@ class GraphExecutionContext:
for message in messages: for message in messages:
self[i].write(message) self[i].write(message)
def dispatch(self, name):
self.dispatcher.dispatch(name, events.ExecutionEvent(self))
def start(self, starter=None): def start(self, starter=None):
self.register_plugins()
self.dispatch(events.START)
self.tick()
for node in self.nodes: for node in self.nodes:
if starter is None: if starter is None:
node.start() node.start()
else: else:
starter(node) starter(node)
self.dispatch(events.STARTED)
def start_plugins(self, starter=None): def tick(self):
for plugin in self.plugins: self.dispatch(events.TICK)
if starter is None: sleep(self.TICK_PERIOD)
plugin.start()
else: def kill(self):
starter(plugin) self.dispatch(events.KILL)
for node_context in self.nodes:
node_context.kill()
self.tick()
def stop(self, stopper=None): def stop(self, stopper=None):
for node in self.nodes: self.dispatch(events.STOP)
for node_context in self.nodes:
if stopper is None: if stopper is None:
node.stop() node_context.stop()
else: else:
stopper(node) stopper(node_context)
self.tick()
self.dispatch(events.STOPPED)
self.unregister_plugins()
def register_plugins(self):
for plugin_context in self.plugins:
plugin_context.register()
def unregister_plugins(self):
for plugin_context in self.plugins:
plugin_context.unregister()

View File

@ -1,10 +1,9 @@
import traceback import sys
import threading
from queue import Empty from queue import Empty
from time import sleep from time import sleep
from types import GeneratorType from types import GeneratorType
import sys
from bonobo.constants import NOT_MODIFIED, BEGIN, END from bonobo.constants import NOT_MODIFIED, BEGIN, END
from bonobo.errors import InactiveReadableError, UnrecoverableError from bonobo.errors import InactiveReadableError, UnrecoverableError
from bonobo.execution.base import LoopingExecutionContext from bonobo.execution.base import LoopingExecutionContext
@ -22,13 +21,8 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
""" """
@property @property
def alive(self): def killed(self):
"""todo check if this is right, and where it is used""" return self._killed
return self._started and not self._stopped
@property
def alive_str(self):
return '+' if self.alive else '-'
def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None): def __init__(self, wrapped, parent=None, services=None, _input=None, _outputs=None):
LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services) LoopingExecutionContext.__init__(self, wrapped, parent=parent, services=services)
@ -36,13 +30,19 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
self.input = _input or Input() self.input = _input or Input()
self.outputs = _outputs or [] self.outputs = _outputs or []
self._killed = False
def __str__(self): def __str__(self):
return self.alive_str + ' ' + self.__name__ + self.get_statistics_as_string(prefix=' ') return self.__name__ + self.get_statistics_as_string(prefix=' ')
def __repr__(self): def __repr__(self):
name, type_name = get_name(self), get_name(type(self)) name, type_name = get_name(self), get_name(type(self))
return '<{}({}{}){}>'.format(type_name, self.alive_str, name, self.get_statistics_as_string(prefix=' ')) return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' '))
def get_flags_as_string(self):
if self.killed:
return '[killed]'
return ''
def write(self, *messages): def write(self, *messages):
""" """
@ -92,22 +92,26 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
return row return row
def loop(self): def loop(self):
while True: while not self._killed:
try: try:
self.step() self.step()
except KeyboardInterrupt: except KeyboardInterrupt:
raise self.handle_error(*sys.exc_info())
break
except InactiveReadableError: except InactiveReadableError:
break break
except Empty: except Empty:
sleep(self.PERIOD) sleep(self.PERIOD)
continue continue
except UnrecoverableError as exc: except UnrecoverableError:
self.handle_error(*sys.exc_info()) self.handle_error(*sys.exc_info())
self.input.shutdown() self.input.shutdown()
break break
except Exception as exc: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
self.handle_error(*sys.exc_info()) self.handle_error(*sys.exc_info())
except BaseException:
self.handle_error(*sys.exc_info())
break
def step(self): def step(self):
# Pull data from the first available input channel. # Pull data from the first available input channel.
@ -119,6 +123,15 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
# todo add timer # todo add timer
self.handle_results(input_bag, input_bag.apply(self._stack)) self.handle_results(input_bag, input_bag.apply(self._stack))
def kill(self):
if not self.started:
raise RuntimeError('Cannot kill a node context that has not started yet.')
if self.stopped:
raise RuntimeError('Cannot kill a node context that has already stopped.')
self._killed = True
def handle_results(self, input_bag, results): def handle_results(self, input_bag, results):
# self._exec_time += timer.duration # self._exec_time += timer.duration
# Put data onto output channels # Put data onto output channels
@ -126,6 +139,9 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
if isinstance(results, GeneratorType): if isinstance(results, GeneratorType):
while True: while True:
try: try:
# if kill flag was step, stop iterating.
if self._killed:
break
result = next(results) result = next(results)
except StopIteration: except StopIteration:
break break
@ -140,7 +156,7 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
def isflag(param): def isflag(param):
return isinstance(param, Token) and param in (NOT_MODIFIED, ) return isinstance(param, Token) and param in (NOT_MODIFIED,)
def split_tokens(output): def split_tokens(output):
@ -152,11 +168,11 @@ def split_tokens(output):
""" """
if isinstance(output, Token): if isinstance(output, Token):
# just a flag # just a flag
return (output, ), () return (output,), ()
if not istuple(output): if not istuple(output):
# no flag # no flag
return (), (output, ) return (), (output,)
i = 0 i = 0
while isflag(output[i]): while isflag(output[i]):

View File

@ -2,25 +2,12 @@ from bonobo.execution.base import LoopingExecutionContext, recoverable
class PluginExecutionContext(LoopingExecutionContext): class PluginExecutionContext(LoopingExecutionContext):
PERIOD = 0.5 @property
def dispatcher(self):
return self.parent.dispatcher
def __init__(self, wrapped, parent): def register(self):
# Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure return self.wrapped.register(self.dispatcher)
# plugins, for example if it depends on an external service.
super().__init__(wrapped(self), parent)
def start(self): def unregister(self):
super().start() return self.wrapped.unregister(self.dispatcher)
with recoverable(self.handle_error):
self.wrapped.on_start()
def shutdown(self):
if self.started:
with recoverable(self.handle_error):
self.wrapped.on_stop()
self.alive = False
def step(self):
with recoverable(self.handle_error):
self.wrapped.on_tick()

View File

@ -5,7 +5,7 @@ from django.core.management.base import BaseCommand, OutputWrapper
import bonobo import bonobo
import bonobo.util import bonobo.util
from bonobo.ext.console import ConsoleOutputPlugin from bonobo.plugins.console import ConsoleOutputPlugin
from bonobo.util.term import CLEAR_EOL from bonobo.util.term import CLEAR_EOL

View File

@ -10,5 +10,14 @@ class Plugin:
""" """
def __init__(self, context): def register(self, dispatcher):
self.context = context """
:param dispatcher: whistle.EventDispatcher
"""
pass
def unregister(self, dispatcher):
"""
:param dispatcher: whistle.EventDispatcher
"""
pass

View File

@ -2,38 +2,14 @@ import io
import sys import sys
from contextlib import redirect_stdout, redirect_stderr from contextlib import redirect_stdout, redirect_stderr
from colorama import Style, Fore, init from colorama import Style, Fore, init as initialize_colorama_output_wrappers
init(wrap=True)
from bonobo import settings from bonobo import settings
from bonobo.execution import events
from bonobo.plugins import Plugin from bonobo.plugins import Plugin
from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP
initialize_colorama_output_wrappers(wrap=True)
class IOBuffer():
"""
The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It
works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from.
On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active".
"""
def __init__(self):
self.current = io.StringIO()
self.write = self.current.write
def switch(self):
previous = self.current
self.current = io.StringIO()
self.write = self.current.write
try:
return previous.getvalue()
finally:
previous.close()
def flush(self):
self.current.flush()
class ConsoleOutputPlugin(Plugin): class ConsoleOutputPlugin(Plugin):
@ -60,13 +36,24 @@ class ConsoleOutputPlugin(Plugin):
# Whether we're on windows, or a real operating system. # Whether we're on windows, or a real operating system.
iswindows = (sys.platform == 'win32') iswindows = (sys.platform == 'win32')
def on_start(self): def __init__(self):
self.isatty = self._stdout.isatty()
def register(self, dispatcher):
dispatcher.add_listener(events.START, self.setup)
dispatcher.add_listener(events.TICK, self.tick)
dispatcher.add_listener(events.STOPPED, self.teardown)
def unregister(self, dispatcher):
dispatcher.remove_listener(events.STOPPED, self.teardown)
dispatcher.remove_listener(events.TICK, self.tick)
dispatcher.remove_listener(events.START, self.setup)
def setup(self, event):
self.prefix = '' self.prefix = ''
self.counter = 0 self.counter = 0
self._append_cache = '' self._append_cache = ''
self.isatty = self._stdout.isatty()
self.stdout = IOBuffer() self.stdout = IOBuffer()
self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout) self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout)
self.redirect_stdout.__enter__() self.redirect_stdout.__enter__()
@ -75,14 +62,14 @@ class ConsoleOutputPlugin(Plugin):
self.redirect_stderr = redirect_stderr(self._stderr if self.iswindows else self.stderr) self.redirect_stderr = redirect_stderr(self._stderr if self.iswindows else self.stderr)
self.redirect_stderr.__enter__() self.redirect_stderr.__enter__()
def on_tick(self): def tick(self, event):
if self.isatty and not self.iswindows: if self.isatty and not self.iswindows:
self._write(self.context.parent, rewind=True) self._write(event.graph_context, rewind=True)
else: else:
pass # not a tty, or windows, so we'll ignore stats output pass # not a tty, or windows, so we'll ignore stats output
def on_stop(self): def teardown(self, event):
self._write(self.context.parent, rewind=False) self._write(event.graph_context, rewind=False)
self.redirect_stderr.__exit__(None, None, None) self.redirect_stderr.__exit__(None, None, None)
self.redirect_stdout.__exit__(None, None, None) self.redirect_stdout.__exit__(None, None, None)
@ -113,6 +100,8 @@ class ConsoleOutputPlugin(Plugin):
name_suffix, name_suffix,
' ', ' ',
node.get_statistics_as_string(), node.get_statistics_as_string(),
' ',
node.get_flags_as_string(),
Style.RESET_ALL, Style.RESET_ALL,
' ', ' ',
) )
@ -128,6 +117,8 @@ class ConsoleOutputPlugin(Plugin):
name_suffix, name_suffix,
' ', ' ',
node.get_statistics_as_string(), node.get_statistics_as_string(),
' ',
node.get_flags_as_string(),
Style.RESET_ALL, Style.RESET_ALL,
' ', ' ',
) )
@ -166,7 +157,32 @@ class ConsoleOutputPlugin(Plugin):
self.counter += 1 self.counter += 1
class IOBuffer():
"""
The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It
works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from.
On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active".
"""
def __init__(self):
self.current = io.StringIO()
self.write = self.current.write
def switch(self):
previous = self.current
self.current = io.StringIO()
self.write = self.current.write
try:
return previous.getvalue()
finally:
previous.close()
def flush(self):
self.current.flush()
def memory_usage(): def memory_usage():
import os, psutil import os, psutil
process = psutil.Process(os.getpid()) process = psutil.Process(os.getpid())
return process.memory_info()[0] / float(2**20) return process.memory_info()[0] / float(2 ** 20)

View File

@ -1,10 +1,8 @@
import time import functools
import logging
import sys import sys
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor, wait, FIRST_EXCEPTION
import mondrian from time import sleep
import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from bonobo.util import get_name from bonobo.util import get_name
from bonobo.constants import BEGIN, END from bonobo.constants import BEGIN, END
@ -27,60 +25,38 @@ class ExecutorStrategy(Strategy):
context = self.create_graph_execution_context(graph, **kwargs) context = self.create_graph_execution_context(graph, **kwargs)
context.write(BEGIN, Bag(), END) context.write(BEGIN, Bag(), END)
executor = self.create_executor()
futures = [] futures = []
context.start_plugins(self.get_plugin_starter(executor, futures)) with self.create_executor() as executor:
context.start(self.get_starter(executor, futures)) context.start(self.get_starter(executor, futures))
while context.alive: while context.alive:
time.sleep(0.1) try:
context.tick()
except KeyboardInterrupt:
logging.getLogger(__name__).warning('KeyboardInterrupt received. Trying to terminate the nodes gracefully.')
context.kill()
break
for plugin_context in context.plugins: context.stop()
plugin_context.shutdown()
context.stop()
executor.shutdown()
return context return context
def get_starter(self, executor, futures): def get_starter(self, executor, futures):
def starter(node): def starter(node):
@functools.wraps(node)
def _runner(): def _runner():
try: try:
node.start() with node:
except Exception: node.loop()
mondrian.excepthook(*sys.exc_info(), context='Could not start node {}.'.format(get_name(node))) except BaseException as exc:
node.input.on_end() logging.getLogger(__name__).info('Got {} in {} runner.'.format(get_name(exc), node),
else: exc_info=sys.exc_info())
node.loop()
try:
node.stop()
except Exception:
mondrian.excepthook(*sys.exc_info(), context='Could not stop node {}.'.format(get_name(node)))
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))
return starter return starter
def get_plugin_starter(self, executor, futures):
def plugin_starter(plugin):
def _runner():
with plugin:
try:
plugin.loop()
except Exception:
mondrian.excepthook(
*sys.exc_info(), context='In plugin loop for {}...'.format(get_name(plugin))
)
futures.append(executor.submit(_runner))
return plugin_starter
class ThreadPoolExecutorStrategy(ExecutorStrategy): class ThreadPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ThreadPoolExecutor executor_factory = ThreadPoolExecutor

View File

@ -1 +0,0 @@

View File

@ -7,7 +7,7 @@ fs==2.0.12
idna==2.6 idna==2.6
jinja2==2.9.6 jinja2==2.9.6
markupsafe==1.0 markupsafe==1.0
mondrian==0.4a0 mondrian==0.3.0
packaging==16.8 packaging==16.8
pbr==3.1.1 pbr==3.1.1
psutil==5.4.0 psutil==5.4.0
@ -17,3 +17,4 @@ requests==2.18.4
six==1.11.0 six==1.11.0
stevedore==1.27.1 stevedore==1.27.1
urllib3==1.22 urllib3==1.22
whistle==1.0a3

View File

@ -53,8 +53,8 @@ setup(
packages=find_packages(exclude=['ez_setup', 'example', 'test']), packages=find_packages(exclude=['ez_setup', 'example', 'test']),
include_package_data=True, include_package_data=True,
install_requires=[ install_requires=[
'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian (== 0.4a0)', 'colorama (>= 0.3)', 'fs (>= 2.0, < 2.1)', 'jinja2 (>= 2.9, < 2.10)', 'mondrian', 'packaging (>= 16, < 17)',
'packaging (>= 16, < 17)', 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)' 'psutil (>= 5.4, < 6.0)', 'requests (>= 2.0, < 3.0)', 'stevedore (>= 1.27, < 1.28)', 'whistle (== 1.0a3)'
], ],
extras_require={ extras_require={
'dev': [ 'dev': [