Refactoring plugin architecture, fixing jupyter integration and a few documentation about how to setup jupyter notebook widget.

This commit is contained in:
Romain Dorgueil
2017-04-24 20:49:05 +02:00
parent d7f8873cca
commit 6926058625
15 changed files with 116 additions and 63 deletions

View File

@ -67,17 +67,36 @@ def create_strategy(name=None):
return factory() return factory()
def _is_interactive_console():
import sys
return sys.stdout.isatty()
def _is_jupyter_notebook():
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
def run(graph, *chain, strategy=None, plugins=None): def run(graph, *chain, strategy=None, plugins=None):
strategy = create_strategy(strategy)
if len(chain): if len(chain):
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.') warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
from bonobo import Graph from bonobo import Graph
graph = Graph(graph, *chain) graph = Graph(graph, *chain)
return strategy.execute(graph, plugins=plugins) strategy = create_strategy(strategy)
plugins = []
if _is_interactive_console():
from bonobo.ext.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook():
from bonobo.ext.jupyter import JupyterOutputPlugin
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
return strategy.execute(graph, plugins=plugins)
del sys del sys
del warnings del warnings

View File

@ -1,6 +1,5 @@
import argparse import argparse
import bonobo
from bonobo import Graph, run
def execute(file, quiet=False): def execute(file, quiet=False):
@ -22,7 +21,7 @@ def execute(file, quiet=False):
except Exception as exc: except Exception as exc:
raise raise
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph)) graphs = dict((k, v) for k, v in context.items() if isinstance(v, bonobo.Graph))
assert len(graphs) == 1, ( assert len(graphs) == 1, (
'Having zero or more than one graph definition in one file is unsupported for now, ' 'Having zero or more than one graph definition in one file is unsupported for now, '
@ -34,7 +33,7 @@ def execute(file, quiet=False):
# todo if console and not quiet, then add the console plugin # todo if console and not quiet, then add the console plugin
# todo when better console plugin, add it if console and just disable display # todo when better console plugin, add it if console and just disable display
return run(graph) return bonobo.run(graph)
def register(parser): def register(parser):

View File

@ -76,7 +76,7 @@ class GraphExecutionContext:
def ensure_tuple(tuple_or_mixed): def ensure_tuple(tuple_or_mixed):
if isinstance(tuple_or_mixed, tuple): if isinstance(tuple_or_mixed, tuple):
return tuple_or_mixed return tuple_or_mixed
return (tuple_or_mixed, ) return (tuple_or_mixed,)
class LoopingExecutionContext(Wrapper): class LoopingExecutionContext(Wrapper):
@ -171,16 +171,24 @@ class LoopingExecutionContext(Wrapper):
class PluginExecutionContext(LoopingExecutionContext): class PluginExecutionContext(LoopingExecutionContext):
PERIOD = 0.5
def __init__(self, wrapped, parent): def __init__(self, wrapped, parent):
LoopingExecutionContext.__init__(self, wrapped, parent) # Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure
# plugins, for example if it depends on an external service.
super().__init__(wrapped(self), parent)
def shutdown(self): def shutdown(self):
self.wrapped.finalize(self) try:
self.alive = False self.wrapped.finalize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
finally:
self.alive = False
def step(self): def step(self):
try: try:
self.wrapped.run(self) self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc()) self.handle_error(exc, traceback.format_exc())

View File

@ -22,7 +22,6 @@ class ExecutorStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs): def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_graph_execution_context(graph, plugins=plugins) context = self.create_graph_execution_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END) context.recv(BEGIN, Bag(), END)
executor = self.create_executor() executor = self.create_executor()
@ -30,16 +29,13 @@ class ExecutorStrategy(Strategy):
futures = [] futures = []
for plugin_context in context.plugins: for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context): def _runner(plugin_context=plugin_context):
plugin_context.start() plugin_context.start()
plugin_context.loop() plugin_context.loop()
plugin_context.stop() plugin_context.stop()
futures.append(executor.submit(_runner)) futures.append(executor.submit(_runner))
for node_context in context.nodes: for node_context in context.nodes:
def _runner(node_context=node_context): def _runner(node_context=node_context):
node_context.start() node_context.start()
node_context.loop() node_context.loop()
@ -67,6 +63,7 @@ class ProcessPoolExecutorStrategy(ExecutorStrategy):
class ThreadCollectionStrategy(Strategy): class ThreadCollectionStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs): def execute(self, graph, *args, plugins=None, **kwargs):
print(type(self), 'execute', graph, args, plugins, kwargs)
context = self.create_graph_execution_context(graph, plugins=plugins) context = self.create_graph_execution_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END) context.recv(BEGIN, Bag(), END)

View File

@ -1,19 +1,11 @@
from bonobo import Graph, ThreadPoolExecutorStrategy import bonobo
def yield_from(*args):
yield from args
# Represent our data processor as a simple directed graph of callables. # Represent our data processor as a simple directed graph of callables.
graph = Graph( graph = bonobo.Graph(
lambda: (x for x in ('foo', 'bar', 'baz')), ['foo', 'bar', 'baz'],
str.upper, str.upper,
print, print,
) )
# Use a thread pool. if __name__ == '__main__':
executor = ThreadPoolExecutorStrategy() bonobo.run(graph)
# Run the thing.
executor.execute(graph)

View File

@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os import sys
from functools import lru_cache from functools import lru_cache
import blessings
import blessings
import os
import psutil import psutil
from bonobo.core.plugins import Plugin from bonobo.core.plugins import Plugin
@ -28,7 +29,7 @@ t = blessings.Terminal()
@lru_cache(1) @lru_cache(1)
def memory_usage(): def memory_usage():
process = psutil.Process(os.getpid()) process = psutil.Process(os.getpid())
return process.get_memory_info()[0] / float(2**20) return process.get_memory_info()[0] / float(2 ** 20)
# @lru_cache(64) # @lru_cache(64)
@ -47,10 +48,11 @@ class ConsoleOutputPlugin(Plugin):
""" """
def __init__(self, prefix=''): def __init__(self, context):
self.prefix = prefix self.context = context
self.prefix = ''
def _write(self, context, rewind): def _write(self, graph_context, rewind):
profile, debug = False, False profile, debug = False, False
if profile: if profile:
append = ( append = (
@ -59,18 +61,16 @@ class ConsoleOutputPlugin(Plugin):
) )
else: else:
append = () append = ()
self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind) self.write(graph_context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
# self.widget.value = [repr(component) for component in context.parent.components] def run(self):
if sys.stdout.isatty():
def run(self, context): self._write(self.context.parent, rewind=True)
if t.is_a_tty:
self._write(context.parent, rewind=True)
else: else:
pass # not a tty pass # not a tty
def finalize(self, context): def finalize(self):
self._write(context.parent, rewind=False) self._write(self.context.parent, rewind=False)
@staticmethod @staticmethod
def write(context, prefix='', rewind=True, append=None, debug=False, profile=False): def write(context, prefix='', rewind=True, append=None, debug=False, profile=False):

View File

@ -1 +0,0 @@

View File

@ -70,7 +70,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',

File diff suppressed because one or more lines are too long

View File

@ -9,7 +9,6 @@ var _ = require('underscore');
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',

View File

@ -14,11 +14,22 @@ except ImportError as e:
class JupyterOutputPlugin(Plugin): class JupyterOutputPlugin(Plugin):
def initialize(self, context): def __init__(self, context):
self.context = context
self.widget = BonoboWidget() self.widget = BonoboWidget()
IPython.core.display.display(self.widget) IPython.core.display.display(self.widget)
def run(self, context): def run(self):
self.widget.value = [repr(component) for component in context.parent.components] self.widget.value = [repr(node) for node in self.context.parent.nodes]
finalize = run finalize = run
"""
TODO JUPYTER WIDGET
###################
# close the widget? what does it do?
https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Basics.html#Closing-widgets
"""

View File

@ -73,7 +73,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the // When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified. // defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({ var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, { defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel', _model_name: 'BonoboModel',

File diff suppressed because one or more lines are too long

View File

@ -2,22 +2,30 @@ Changelog
========= =========
0.9.0 Incoming...
::::: :::::::::::
* todo migrate doc * todo migrate doc
* todo migrate tests * todo migrate tests
* todo migrate transforms ? * todo migrate transforms ?
Version 0.3
:::::::::::
* Autodetect if within jupyter notebook context, and apply plugin if it's the case.
* Console run should allow console plugin as a command line argument.
* New bonobo.structs package with simple data structures (bags, graphs, tokens).
Initial release Initial release
::::::::::::::: :::::::::::::::
* Migration from rdc.etl. * Migration from rdc.etl.
* New cool name. * New cool name (ok, that's debatable).
* Only supports python 3.5+, aggressively (which means, we can use async, and we remove all things from python 2/six compat) * Only supports python 3.5+, aggressively (which means, we can use async, and we remove all things from python 2/six compat)
* Removes all thing deprecated and/or not really convincing * Removes all thing deprecated and/or not really convincing from rdc.etl.
* We want transforms to be simple callables, so refactoring of the harness mess * We want transforms to be simple callables, so refactoring of the harness mess.
* We want to use plain python data structures, so hashes are removed. If you use python 3.6, you may even get sorted dicts. * We want to use plain python data structures, so hashes are removed. If you use python 3.6, you may even get sorted dicts.
* Input/output MUX DEMUX removed, maybe no need for that in the real world. May come back, but not in 1.0 * Input/output MUX DEMUX removed, maybe no need for that in the real world. May come back, but not in 1.0
* Change dependency policy. We need to include only the very basic requirements (and very required). Everything related to transforms that we may not use (bs, sqla, ...) should be optional dependencies. * Change dependency policy. We need to include only the very basic requirements (and very required). Everything related
* execution strategies !!! to transforms that we may not use (bs, sqla, ...) should be optional dependencies.
* Execution strategies, threaded by default.

View File

@ -1,11 +1,34 @@
Bonobo with Jupyter Bonobo with Jupyter
================== ==================
There is a builtin plugin that integrates (kind of minimalistically, for now) bonobo within jupyter notebooks, so
you can read the execution status of a graph within a nice (ok not so nice) html/javascript widget.
See https://github.com/jupyter-widgets/widget-cookiecutter for the base template used.
Installation Installation
:::::::::::: ::::::::::::
Overview To install the widget::
::::::::
jupyter nbextension enable --py --sys-prefix bonobo.ext.jupyter
Development
:::::::::::
To install the widget for development, make sure you're using an editable install of bonobo (see install document)::
jupyter nbextension install --py --symlink --sys-prefix bonobo.ext.jupyter
jupyter nbextension enable --py --sys-prefix bonobo.ext.jupyter
If you wanna change the javascript, you should run webpack in watch mode in some terminal::
cd bonobo/ext/jupyter/js
npm install
./node_modules/.bin/webpack --watch
To compile the widget into a distributable version (which gets packaged on PyPI when a release is made), just run
webpack::
./node_modules/.bin/webpack
Details
:::::::