Merge pull request #30 from hartym/plugins

Refactoring plugin architecture, fixing jupyter integration and a few…
This commit is contained in:
Romain Dorgueil
2017-04-24 21:10:31 +02:00
committed by GitHub
15 changed files with 116 additions and 63 deletions

View File

@ -67,17 +67,36 @@ def create_strategy(name=None):
return factory()
def _is_interactive_console():
import sys
return sys.stdout.isatty()
def _is_jupyter_notebook():
try:
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
except NameError:
return False
def run(graph, *chain, strategy=None, plugins=None):
strategy = create_strategy(strategy)
if len(chain):
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
from bonobo import Graph
graph = Graph(graph, *chain)
return strategy.execute(graph, plugins=plugins)
strategy = create_strategy(strategy)
plugins = []
if _is_interactive_console():
from bonobo.ext.console import ConsoleOutputPlugin
if ConsoleOutputPlugin not in plugins:
plugins.append(ConsoleOutputPlugin)
if _is_jupyter_notebook():
from bonobo.ext.jupyter import JupyterOutputPlugin
if JupyterOutputPlugin not in plugins:
plugins.append(JupyterOutputPlugin)
return strategy.execute(graph, plugins=plugins)
del sys
del warnings

View File

@ -1,6 +1,5 @@
import argparse
from bonobo import Graph, run
import bonobo
def execute(file, quiet=False):
@ -22,7 +21,7 @@ def execute(file, quiet=False):
except Exception as exc:
raise
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph))
graphs = dict((k, v) for k, v in context.items() if isinstance(v, bonobo.Graph))
assert len(graphs) == 1, (
'Having zero or more than one graph definition in one file is unsupported for now, '
@ -34,7 +33,7 @@ def execute(file, quiet=False):
# todo if console and not quiet, then add the console plugin
# todo when better console plugin, add it if console and just disable display
return run(graph)
return bonobo.run(graph)
def register(parser):

View File

@ -76,7 +76,7 @@ class GraphExecutionContext:
def ensure_tuple(tuple_or_mixed):
if isinstance(tuple_or_mixed, tuple):
return tuple_or_mixed
return (tuple_or_mixed, )
return (tuple_or_mixed,)
class LoopingExecutionContext(Wrapper):
@ -171,16 +171,24 @@ class LoopingExecutionContext(Wrapper):
class PluginExecutionContext(LoopingExecutionContext):
PERIOD = 0.5
def __init__(self, wrapped, parent):
LoopingExecutionContext.__init__(self, wrapped, parent)
# Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure
# plugins, for example if it depends on an external service.
super().__init__(wrapped(self), parent)
def shutdown(self):
self.wrapped.finalize(self)
self.alive = False
try:
self.wrapped.finalize()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
finally:
self.alive = False
def step(self):
try:
self.wrapped.run(self)
self.wrapped.run()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())

View File

@ -22,7 +22,6 @@ class ExecutorStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_graph_execution_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END)
executor = self.create_executor()
@ -30,16 +29,13 @@ class ExecutorStrategy(Strategy):
futures = []
for plugin_context in context.plugins:
def _runner(plugin_context=plugin_context):
plugin_context.start()
plugin_context.loop()
plugin_context.stop()
futures.append(executor.submit(_runner))
for node_context in context.nodes:
def _runner(node_context=node_context):
node_context.start()
node_context.loop()
@ -67,6 +63,7 @@ class ProcessPoolExecutorStrategy(ExecutorStrategy):
class ThreadCollectionStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs):
print(type(self), 'execute', graph, args, plugins, kwargs)
context = self.create_graph_execution_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END)

View File

@ -1,19 +1,11 @@
from bonobo import Graph, ThreadPoolExecutorStrategy
def yield_from(*args):
yield from args
import bonobo
# Represent our data processor as a simple directed graph of callables.
graph = Graph(
lambda: (x for x in ('foo', 'bar', 'baz')),
graph = bonobo.Graph(
['foo', 'bar', 'baz'],
str.upper,
print,
)
# Use a thread pool.
executor = ThreadPoolExecutorStrategy()
# Run the thing.
executor.execute(graph)
if __name__ == '__main__':
bonobo.run(graph)

View File

@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from functools import lru_cache
import blessings
import blessings
import os
import psutil
from bonobo.core.plugins import Plugin
@ -28,7 +29,7 @@ t = blessings.Terminal()
@lru_cache(1)
def memory_usage():
process = psutil.Process(os.getpid())
return process.get_memory_info()[0] / float(2**20)
return process.get_memory_info()[0] / float(2 ** 20)
# @lru_cache(64)
@ -47,10 +48,11 @@ class ConsoleOutputPlugin(Plugin):
"""
def __init__(self, prefix=''):
self.prefix = prefix
def __init__(self, context):
self.context = context
self.prefix = ''
def _write(self, context, rewind):
def _write(self, graph_context, rewind):
profile, debug = False, False
if profile:
append = (
@ -59,18 +61,16 @@ class ConsoleOutputPlugin(Plugin):
)
else:
append = ()
self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
self.write(graph_context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
# self.widget.value = [repr(component) for component in context.parent.components]
def run(self, context):
if t.is_a_tty:
self._write(context.parent, rewind=True)
def run(self):
if sys.stdout.isatty():
self._write(self.context.parent, rewind=True)
else:
pass # not a tty
def finalize(self, context):
self._write(context.parent, rewind=False)
def finalize(self):
self._write(self.context.parent, rewind=False)
@staticmethod
def write(context, prefix='', rewind=True, append=None, debug=False, profile=False):

View File

@ -1 +0,0 @@

View File

@ -70,7 +70,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel',

File diff suppressed because one or more lines are too long

View File

@ -9,7 +9,6 @@ var _ = require('underscore');
// When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel',

View File

@ -14,11 +14,22 @@ except ImportError as e:
class JupyterOutputPlugin(Plugin):
def initialize(self, context):
def __init__(self, context):
self.context = context
self.widget = BonoboWidget()
IPython.core.display.display(self.widget)
def run(self, context):
self.widget.value = [repr(component) for component in context.parent.components]
def run(self):
self.widget.value = [repr(node) for node in self.context.parent.nodes]
finalize = run
"""
TODO JUPYTER WIDGET
###################
# close the widget? what does it do?
https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Basics.html#Closing-widgets
"""

View File

@ -73,7 +73,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
// When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel',

File diff suppressed because one or more lines are too long

View File

@ -2,22 +2,30 @@ Changelog
=========
0.9.0
:::::
Incoming...
:::::::::::
* todo migrate doc
* todo migrate tests
* todo migrate transforms ?
Version 0.3
:::::::::::
* Autodetect if within jupyter notebook context, and apply plugin if it's the case.
* Console run should allow console plugin as a command line argument.
* New bonobo.structs package with simple data structures (bags, graphs, tokens).
Initial release
:::::::::::::::
* Migration from rdc.etl.
* New cool name.
* New cool name (ok, that's debatable).
* Only supports python 3.5+, aggressively (which means, we can use async, and we remove all things from python 2/six compat)
* Removes all thing deprecated and/or not really convincing
* We want transforms to be simple callables, so refactoring of the harness mess
* Removes all thing deprecated and/or not really convincing from rdc.etl.
* We want transforms to be simple callables, so refactoring of the harness mess.
* We want to use plain python data structures, so hashes are removed. If you use python 3.6, you may even get sorted dicts.
* Input/output MUX DEMUX removed, maybe no need for that in the real world. May come back, but not in 1.0
* Change dependency policy. We need to include only the very basic requirements (and very required). Everything related to transforms that we may not use (bs, sqla, ...) should be optional dependencies.
* execution strategies !!!
* Change dependency policy. We need to include only the very basic requirements (and very required). Everything related
to transforms that we may not use (bs, sqla, ...) should be optional dependencies.
* Execution strategies, threaded by default.

View File

@ -1,11 +1,34 @@
Bonobo with Jupyter
==================
There is a builtin plugin that integrates (kind of minimalistically, for now) bonobo within jupyter notebooks, so
you can read the execution status of a graph within a nice (ok not so nice) html/javascript widget.
See https://github.com/jupyter-widgets/widget-cookiecutter for the base template used.
Installation
::::::::::::
Overview
::::::::
To install the widget::
jupyter nbextension enable --py --sys-prefix bonobo.ext.jupyter
Development
:::::::::::
To install the widget for development, make sure you're using an editable install of bonobo (see install document)::
jupyter nbextension install --py --symlink --sys-prefix bonobo.ext.jupyter
jupyter nbextension enable --py --sys-prefix bonobo.ext.jupyter
If you wanna change the javascript, you should run webpack in watch mode in some terminal::
cd bonobo/ext/jupyter/js
npm install
./node_modules/.bin/webpack --watch
To compile the widget into a distributable version (which gets packaged on PyPI when a release is made), just run
webpack::
./node_modules/.bin/webpack
Details
:::::::