Merge branch 'dev_graphviz' into dev_convert

This commit is contained in:
Romain Dorgueil
2017-09-18 17:18:29 +02:00
26 changed files with 397 additions and 82 deletions

View File

@ -1,6 +1,8 @@
import logging
from bonobo.structs import Bag, Graph, Token
from bonobo.nodes import CsvReader, CsvWriter, FileReader, FileWriter, Filter, JsonReader, JsonWriter, Limit, \
PrettyPrinter, PickleWriter, PickleReader, RateLimited, Tee, count, identity, noop
PickleReader, PickleWriter, PrettyPrinter, RateLimited, Tee, arg0_to_kwargs, count, identity, kwargs_to_arg0, noop
from bonobo.strategies import create_strategy
from bonobo.util.objects import get_name
@ -21,17 +23,17 @@ def register_api_group(*args):
def run(graph, strategy=None, plugins=None, services=None):
"""
Main entry point of bonobo. It takes a graph and creates all the necessary plumbery around to execute it.
The only necessary argument is a :class:`Graph` instance, containing the logic you actually want to execute.
By default, this graph will be executed using the "threadpool" strategy: each graph node will be wrapped in a
thread, and executed in a loop until there is no more input to this node.
You can provide plugins factory objects in the plugins list, this function will add the necessary plugins for
interactive console execution and jupyter notebook execution if it detects correctly that it runs in this context.
You'll probably want to provide a services dictionary mapping service names to service instances.
:param Graph graph: The :class:`Graph` to execute.
:param str strategy: The :class:`bonobo.strategies.base.Strategy` to use.
:param list plugins: The list of plugins to enhance execution.
@ -52,9 +54,17 @@ def run(graph, strategy=None, plugins=None, services=None):
plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook():
from bonobo.ext.jupyter import JupyterOutputPlugin
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
try:
from bonobo.ext.jupyter import JupyterOutputPlugin
except ImportError:
logging.warning(
'Failed to load jupyter widget. Easiest way is to install the optional "jupyter" '
'dependencies with «pip install bonobo[jupyter]», but you can also install a specific '
'version by yourself.'
)
else:
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
return strategy.execute(graph, plugins=plugins, services=services)
@ -71,7 +81,7 @@ register_api(create_strategy)
def open_fs(fs_url=None, *args, **kwargs):
"""
Wraps :func:`fs.open_fs` function with a few candies.
:param str fs_url: A filesystem URL
:param parse_result: A parsed filesystem URL.
:type parse_result: :class:`ParseResult`
@ -101,13 +111,15 @@ register_api_group(
JsonReader,
JsonWriter,
Limit,
PrettyPrinter,
PickleReader,
PickleWriter,
PrettyPrinter,
RateLimited,
Tee,
arg0_to_kwargs,
count,
identity,
kwargs_to_arg0,
noop,
)

32
bonobo/commands/graph.py Normal file
View File

@ -0,0 +1,32 @@
import json
import itertools
from bonobo.util.objects import get_name
from bonobo.commands.run import read, register_generic_run_arguments
from bonobo.constants import BEGIN
def execute(filename, module, install=False, quiet=False, verbose=False):
graph, plugins, services = read(filename, module, install, quiet, verbose)
print('digraph {')
print(' rankdir = LR;')
print(' "BEGIN" [shape="point"];')
for i in graph.outputs_of(BEGIN):
print(' "BEGIN" -> ' + json.dumps(get_name(graph[i])) + ';')
for ix in graph.topologically_sorted_indexes:
for iy in graph.outputs_of(ix):
print(' {} -> {};'.format(
json.dumps(get_name(graph[ix])),
json.dumps(get_name(graph[iy]))
))
print('}')
def register(parser):
register_generic_run_arguments(parser)
return execute

View File

@ -1,9 +1,9 @@
import os
DEFAULT_SERVICES_FILENAME = '_services.py'
DEFAULT_SERVICES_ATTR = 'get_services'
import bonobo
from bonobo.constants import DEFAULT_SERVICES_ATTR, DEFAULT_SERVICES_FILENAME
DEFAULT_GRAPH_FILENAMES = ('__main__.py', 'main.py', )
DEFAULT_GRAPH_FILENAMES = ('__main__.py', 'main.py',)
DEFAULT_GRAPH_ATTR = 'get_graph'
@ -26,9 +26,23 @@ def get_default_services(filename, services=None):
return services or {}
def execute(filename, module, install=False, quiet=False, verbose=False):
def _install_requirements(requirements):
"""Install requirements given a path to requirements.txt file."""
import importlib
import pip
pip.main(['install', '-r', requirements])
# Some shenanigans to be sure everything is importable after this, especially .egg-link files which
# are referenced in *.pth files and apparently loaded by site.py at some magic bootstrap moment of the
# python interpreter.
pip.utils.pkg_resources = importlib.reload(pip.utils.pkg_resources)
import site
importlib.reload(site)
def read(filename, module, install=False, quiet=False, verbose=False):
import runpy
from bonobo import Graph, run, settings
from bonobo import Graph, settings
if quiet:
settings.QUIET.set(True)
@ -39,16 +53,8 @@ def execute(filename, module, install=False, quiet=False, verbose=False):
if filename:
if os.path.isdir(filename):
if install:
import importlib
import pip
requirements = os.path.join(filename, 'requirements.txt')
pip.main(['install', '-r', requirements])
# Some shenanigans to be sure everything is importable after this, especially .egg-link files which
# are referenced in *.pth files and apparently loaded by site.py at some magic bootstrap moment of the
# python interpreter.
pip.utils.pkg_resources = importlib.reload(pip.utils.pkg_resources)
import site
importlib.reload(site)
_install_requirements(requirements)
pathname = filename
for filename in DEFAULT_GRAPH_FILENAMES:
@ -58,7 +64,8 @@ def execute(filename, module, install=False, quiet=False, verbose=False):
if not os.path.exists(filename):
raise IOError('Could not find entrypoint (candidates: {}).'.format(', '.join(DEFAULT_GRAPH_FILENAMES)))
elif install:
raise RuntimeError('Cannot --install on a file (only available for dirs containing requirements.txt).')
requirements = os.path.join(os.path.dirname(filename), 'requirements.txt')
_install_requirements(requirements)
context = runpy.run_path(filename, run_name='__bonobo__')
elif module:
context = runpy.run_module(module, run_name='__bonobo__')
@ -74,15 +81,21 @@ def execute(filename, module, install=False, quiet=False, verbose=False):
).format(len(graphs))
graph = list(graphs.values())[0]
plugins = []
services = get_default_services(
filename, context.get(DEFAULT_SERVICES_ATTR)() if DEFAULT_SERVICES_ATTR in context else None
)
# todo if console and not quiet, then add the console plugin
# todo when better console plugin, add it if console and just disable display
return run(
return graph, plugins, services
def execute(filename, module, install=False, quiet=False, verbose=False):
graph, plugins, services = read(filename, module, install, quiet, verbose)
return bonobo.run(
graph,
plugins=[],
services=get_default_services(
filename, context.get(DEFAULT_SERVICES_ATTR)() if DEFAULT_SERVICES_ATTR in context else None
)
plugins=plugins,
services=services
)

View File

@ -4,3 +4,5 @@ BEGIN = Token('Begin')
END = Token('End')
INHERIT_INPUT = Token('InheritInput')
NOT_MODIFIED = Token('NotModified')
DEFAULT_SERVICES_FILENAME = '_services.py'
DEFAULT_SERVICES_ATTR = 'get_services'

View File

@ -16,8 +16,9 @@ class PluginExecutionContext(LoopingExecutionContext):
self.wrapped.initialize()
def shutdown(self):
with recoverable(self.handle_error):
self.wrapped.finalize()
if self.started:
with recoverable(self.handle_error):
self.wrapped.finalize()
self.alive = False
def step(self):

View File

@ -2,7 +2,9 @@ import io
import sys
from contextlib import redirect_stdout
from colorama import Style, Fore
from colorama import Style, Fore, init
init(wrap=True)
from bonobo import settings
from bonobo.plugins import Plugin
@ -10,6 +12,13 @@ from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP
class IOBuffer():
"""
The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It
works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from.
On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active".
"""
def __init__(self):
self.current = io.StringIO()
self.write = self.current.write
@ -23,12 +32,18 @@ class IOBuffer():
finally:
previous.close()
def flush(self):
self.current.flush()
class ConsoleOutputPlugin(Plugin):
"""
Outputs status information to the connected stdout. Can be a TTY, with or without support for colors/cursor
movements, or a non tty (pipe, file, ...). The features are adapted to terminal capabilities.
On Windows, we'll play a bit differently because we don't know how to manipulate cursor position. We'll only
display stats at the very end, and there won't be this "buffering" logic we need to display both stats and stdout.
.. attribute:: prefix
String prefix of output lines.
@ -40,17 +55,18 @@ class ConsoleOutputPlugin(Plugin):
self.counter = 0
self._append_cache = ''
self.isatty = sys.stdout.isatty()
self.iswindows = (sys.platform == 'win32')
self._stdout = sys.stdout
self.stdout = IOBuffer()
self.redirect_stdout = redirect_stdout(self.stdout)
self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout)
self.redirect_stdout.__enter__()
def run(self):
if self.isatty:
if self.isatty and not self.iswindows:
self._write(self.context.parent, rewind=True)
else:
pass # not a tty
pass # not a tty, or windows, so we'll ignore stats output
def finalize(self):
self._write(self.context.parent, rewind=False)
@ -59,9 +75,13 @@ class ConsoleOutputPlugin(Plugin):
def write(self, context, prefix='', rewind=True, append=None):
t_cnt = len(context)
buffered = self.stdout.switch()
for line in buffered.split('\n')[:-1]:
print(line + CLEAR_EOL, file=sys.stderr)
if not self.iswindows:
buffered = self.stdout.switch()
for line in buffered.split('\n')[:-1]:
print(line + CLEAR_EOL, file=sys.stderr)
alive_color = Style.BRIGHT
dead_color = (Style.BRIGHT + Fore.BLACK) if self.iswindows else Fore.BLACK
for i in context.graph.topologically_sorted_indexes:
node = context[i]
@ -69,14 +89,14 @@ class ConsoleOutputPlugin(Plugin):
if node.alive:
_line = ''.join(
(
' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ', node.name, name_suffix, ' ',
' ', alive_color, '+', Style.RESET_ALL, ' ', node.name, name_suffix, ' ',
node.get_statistics_as_string(), Style.RESET_ALL, ' ',
)
)
else:
_line = ''.join(
(
' ', Fore.BLACK, '-', ' ', node.name, name_suffix, ' ', node.get_statistics_as_string(),
' ', dead_color, '-', ' ', node.name, name_suffix, ' ', node.get_statistics_as_string(),
Style.RESET_ALL, ' ',
)
)

View File

@ -1,11 +1,11 @@
import logging
from bonobo.ext.jupyter.widget import BonoboWidget
from bonobo.plugins import Plugin
try:
import IPython.core.display
except ImportError as e:
import logging
logging.exception(
'You must install Jupyter to use the bonobo Jupyter extension. Easiest way is to install the '
'optional "jupyter" dependencies with «pip install bonobo[jupyter]», but you can also install a '

View File

@ -8,19 +8,22 @@ from colorama import Fore, Style
from bonobo import settings
from bonobo.util.term import CLEAR_EOL
iswindows = (sys.platform == 'win32')
def get_format():
yield '{b}[%(fg)s%(levelname)s{b}][{w}'
yield '{b}][{w}'.join(('%(spent)04d', '%(name)s'))
yield '{b}]'
yield ' %(fg)s%(message)s{r}'
yield CLEAR_EOL
if not iswindows:
yield CLEAR_EOL
colors = {
'b': Fore.BLACK,
'w': Fore.LIGHTBLACK_EX,
'r': Style.RESET_ALL,
'b': '' if iswindows else Fore.BLACK,
'w': '' if iswindows else Fore.LIGHTBLACK_EX,
'r': '' if iswindows else Style.RESET_ALL,
}
format = (''.join(get_format())).format(**colors)
@ -28,7 +31,9 @@ format = (''.join(get_format())).format(**colors)
class Filter(logging.Filter):
def filter(self, record):
record.spent = record.relativeCreated // 1000
if record.levelname == 'DEBG':
if iswindows:
record.fg = ''
elif record.levelname == 'DEBG':
record.fg = Fore.LIGHTBLACK_EX
elif record.levelname == 'INFO':
record.fg = Fore.LIGHTWHITE_EX
@ -46,7 +51,10 @@ class Filter(logging.Filter):
class Formatter(logging.Formatter):
def formatException(self, ei):
tb = super().formatException(ei)
return textwrap.indent(tb, Fore.BLACK + ' | ' + Fore.WHITE)
if iswindows:
return textwrap.indent(tb, ' | ')
else:
return textwrap.indent(tb, Fore.BLACK + ' | ' + Fore.WHITE)
def setup(level):

View File

@ -91,8 +91,22 @@ def noop(*args, **kwargs): # pylint: disable=unused-argument
def arg0_to_kwargs(row):
"""
Transform items in a stream from "arg0" format (each call only has one positional argument, which is a dict-like
object) to "kwargs" format (each call only has keyword arguments that represent a row).
:param row:
:return: bonobo.Bag
"""
return Bag(**row)
def kwargs_to_arg0(**row):
"""
Transform items in a stream from "kwargs" format (each call only has keyword arguments that represent a row) to
"arg0" format (each call only has one positional argument, which is a dict-like object) .
:param **row:
:return: bonobo.Bag
"""
return Bag(row)

9
bonobo/util/graphviz.py Normal file
View File

@ -0,0 +1,9 @@
def render_as_dot(graph):
"""
:param bonobo.Graph graph:
:return: str
"""
pass