@ -23,17 +23,17 @@ def register_api_group(*args):
|
||||
def run(graph, strategy=None, plugins=None, services=None):
|
||||
"""
|
||||
Main entry point of bonobo. It takes a graph and creates all the necessary plumbery around to execute it.
|
||||
|
||||
|
||||
The only necessary argument is a :class:`Graph` instance, containing the logic you actually want to execute.
|
||||
|
||||
|
||||
By default, this graph will be executed using the "threadpool" strategy: each graph node will be wrapped in a
|
||||
thread, and executed in a loop until there is no more input to this node.
|
||||
|
||||
|
||||
You can provide plugins factory objects in the plugins list, this function will add the necessary plugins for
|
||||
interactive console execution and jupyter notebook execution if it detects correctly that it runs in this context.
|
||||
|
||||
|
||||
You'll probably want to provide a services dictionary mapping service names to service instances.
|
||||
|
||||
|
||||
:param Graph graph: The :class:`Graph` to execute.
|
||||
:param str strategy: The :class:`bonobo.strategies.base.Strategy` to use.
|
||||
:param list plugins: The list of plugins to enhance execution.
|
||||
@ -58,7 +58,10 @@ def run(graph, strategy=None, plugins=None, services=None):
|
||||
from bonobo.ext.jupyter import JupyterOutputPlugin
|
||||
except ImportError:
|
||||
logging.warning(
|
||||
'Failed to load jupyter widget. Easiest way is to install the optional "jupyter" ' 'dependencies with «pip install bonobo[jupyter]», but you can also install a specific ' 'version by yourself.')
|
||||
'Failed to load jupyter widget. Easiest way is to install the optional "jupyter" '
|
||||
'dependencies with «pip install bonobo[jupyter]», but you can also install a specific '
|
||||
'version by yourself.'
|
||||
)
|
||||
else:
|
||||
if JupyterOutputPlugin not in plugins:
|
||||
plugins.append(JupyterOutputPlugin)
|
||||
@ -78,7 +81,7 @@ register_api(create_strategy)
|
||||
def open_fs(fs_url=None, *args, **kwargs):
|
||||
"""
|
||||
Wraps :func:`fs.open_fs` function with a few candies.
|
||||
|
||||
|
||||
:param str fs_url: A filesystem URL
|
||||
:param parse_result: A parsed filesystem URL.
|
||||
:type parse_result: :class:`ParseResult`
|
||||
|
||||
@ -2,7 +2,9 @@ import io
|
||||
import sys
|
||||
from contextlib import redirect_stdout
|
||||
|
||||
from colorama import Style, Fore
|
||||
from colorama import Style, Fore, init
|
||||
|
||||
init(wrap=True)
|
||||
|
||||
from bonobo import settings
|
||||
from bonobo.plugins import Plugin
|
||||
@ -10,6 +12,13 @@ from bonobo.util.term import CLEAR_EOL, MOVE_CURSOR_UP
|
||||
|
||||
|
||||
class IOBuffer():
|
||||
"""
|
||||
The role of IOBuffer is to overcome the problem of multiple threads wanting to write to stdout at the same time. It
|
||||
works a bit like a videogame: there are two buffers, one that is used to write, and one which is used to read from.
|
||||
On each cycle, we swap the buffers, and the console plugin handle output of the one which is not anymore "active".
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.current = io.StringIO()
|
||||
self.write = self.current.write
|
||||
@ -32,6 +41,9 @@ class ConsoleOutputPlugin(Plugin):
|
||||
Outputs status information to the connected stdout. Can be a TTY, with or without support for colors/cursor
|
||||
movements, or a non tty (pipe, file, ...). The features are adapted to terminal capabilities.
|
||||
|
||||
On Windows, we'll play a bit differently because we don't know how to manipulate cursor position. We'll only
|
||||
display stats at the very end, and there won't be this "buffering" logic we need to display both stats and stdout.
|
||||
|
||||
.. attribute:: prefix
|
||||
|
||||
String prefix of output lines.
|
||||
@ -43,17 +55,18 @@ class ConsoleOutputPlugin(Plugin):
|
||||
self.counter = 0
|
||||
self._append_cache = ''
|
||||
self.isatty = sys.stdout.isatty()
|
||||
self.iswindows = (sys.platform == 'win32')
|
||||
|
||||
self._stdout = sys.stdout
|
||||
self.stdout = IOBuffer()
|
||||
self.redirect_stdout = redirect_stdout(self.stdout)
|
||||
self.redirect_stdout = redirect_stdout(self._stdout if self.iswindows else self.stdout)
|
||||
self.redirect_stdout.__enter__()
|
||||
|
||||
def run(self):
|
||||
if self.isatty:
|
||||
if self.isatty and not self.iswindows:
|
||||
self._write(self.context.parent, rewind=True)
|
||||
else:
|
||||
pass # not a tty
|
||||
pass # not a tty, or windows, so we'll ignore stats output
|
||||
|
||||
def finalize(self):
|
||||
self._write(self.context.parent, rewind=False)
|
||||
@ -62,9 +75,13 @@ class ConsoleOutputPlugin(Plugin):
|
||||
def write(self, context, prefix='', rewind=True, append=None):
|
||||
t_cnt = len(context)
|
||||
|
||||
buffered = self.stdout.switch()
|
||||
for line in buffered.split('\n')[:-1]:
|
||||
print(line + CLEAR_EOL, file=sys.stderr)
|
||||
if not self.iswindows:
|
||||
buffered = self.stdout.switch()
|
||||
for line in buffered.split('\n')[:-1]:
|
||||
print(line + CLEAR_EOL, file=sys.stderr)
|
||||
|
||||
alive_color = Style.BRIGHT
|
||||
dead_color = (Style.BRIGHT + Fore.BLACK) if self.iswindows else Fore.BLACK
|
||||
|
||||
for i in context.graph.topologically_sorted_indexes:
|
||||
node = context[i]
|
||||
@ -72,14 +89,14 @@ class ConsoleOutputPlugin(Plugin):
|
||||
if node.alive:
|
||||
_line = ''.join(
|
||||
(
|
||||
' ', Style.BRIGHT, '+', Style.RESET_ALL, ' ', node.name, name_suffix, ' ',
|
||||
' ', alive_color, '+', Style.RESET_ALL, ' ', node.name, name_suffix, ' ',
|
||||
node.get_statistics_as_string(), Style.RESET_ALL, ' ',
|
||||
)
|
||||
)
|
||||
else:
|
||||
_line = ''.join(
|
||||
(
|
||||
' ', Fore.BLACK, '-', ' ', node.name, name_suffix, ' ', node.get_statistics_as_string(),
|
||||
' ', dead_color, '-', ' ', node.name, name_suffix, ' ', node.get_statistics_as_string(),
|
||||
Style.RESET_ALL, ' ',
|
||||
)
|
||||
)
|
||||
|
||||
@ -8,19 +8,22 @@ from colorama import Fore, Style
|
||||
from bonobo import settings
|
||||
from bonobo.util.term import CLEAR_EOL
|
||||
|
||||
iswindows = (sys.platform == 'win32')
|
||||
|
||||
|
||||
def get_format():
|
||||
yield '{b}[%(fg)s%(levelname)s{b}][{w}'
|
||||
yield '{b}][{w}'.join(('%(spent)04d', '%(name)s'))
|
||||
yield '{b}]'
|
||||
yield ' %(fg)s%(message)s{r}'
|
||||
yield CLEAR_EOL
|
||||
if not iswindows:
|
||||
yield CLEAR_EOL
|
||||
|
||||
|
||||
colors = {
|
||||
'b': Fore.BLACK,
|
||||
'w': Fore.LIGHTBLACK_EX,
|
||||
'r': Style.RESET_ALL,
|
||||
'b': '' if iswindows else Fore.BLACK,
|
||||
'w': '' if iswindows else Fore.LIGHTBLACK_EX,
|
||||
'r': '' if iswindows else Style.RESET_ALL,
|
||||
}
|
||||
format = (''.join(get_format())).format(**colors)
|
||||
|
||||
@ -28,7 +31,9 @@ format = (''.join(get_format())).format(**colors)
|
||||
class Filter(logging.Filter):
|
||||
def filter(self, record):
|
||||
record.spent = record.relativeCreated // 1000
|
||||
if record.levelname == 'DEBG':
|
||||
if iswindows:
|
||||
record.fg = ''
|
||||
elif record.levelname == 'DEBG':
|
||||
record.fg = Fore.LIGHTBLACK_EX
|
||||
elif record.levelname == 'INFO':
|
||||
record.fg = Fore.LIGHTWHITE_EX
|
||||
@ -46,7 +51,10 @@ class Filter(logging.Filter):
|
||||
class Formatter(logging.Formatter):
|
||||
def formatException(self, ei):
|
||||
tb = super().formatException(ei)
|
||||
return textwrap.indent(tb, Fore.BLACK + ' | ' + Fore.WHITE)
|
||||
if iswindows:
|
||||
return textwrap.indent(tb, ' | ')
|
||||
else:
|
||||
return textwrap.indent(tb, Fore.BLACK + ' | ' + Fore.WHITE)
|
||||
|
||||
|
||||
def setup(level):
|
||||
|
||||
@ -78,8 +78,7 @@ def test_install_requirements_for_dir(runner):
|
||||
dirname = get_examples_path('types')
|
||||
with patch('bonobo.commands.run._install_requirements') as install_mock:
|
||||
runner('run', '--install', dirname)
|
||||
install_mock.assert_called_once_with(
|
||||
os.path.join(dirname, 'requirements.txt'))
|
||||
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))
|
||||
|
||||
|
||||
@all_runners
|
||||
@ -87,8 +86,7 @@ def test_install_requirements_for_file(runner):
|
||||
dirname = get_examples_path('types')
|
||||
with patch('bonobo.commands.run._install_requirements') as install_mock:
|
||||
runner('run', '--install', os.path.join(dirname, 'strings.py'))
|
||||
install_mock.assert_called_once_with(
|
||||
os.path.join(dirname, 'requirements.txt'))
|
||||
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))
|
||||
|
||||
|
||||
@all_runners
|
||||
|
||||
Reference in New Issue
Block a user