Merge remote-tracking branch 'upstream/0.2' into colorama
This commit is contained in:
@ -67,17 +67,36 @@ def create_strategy(name=None):
|
||||
|
||||
return factory()
|
||||
|
||||
def _is_interactive_console():
|
||||
import sys
|
||||
return sys.stdout.isatty()
|
||||
|
||||
def _is_jupyter_notebook():
|
||||
try:
|
||||
return get_ipython().__class__.__name__ == 'ZMQInteractiveShell'
|
||||
except NameError:
|
||||
return False
|
||||
|
||||
def run(graph, *chain, strategy=None, plugins=None):
|
||||
strategy = create_strategy(strategy)
|
||||
|
||||
if len(chain):
|
||||
warnings.warn('DEPRECATED. You should pass a Graph instance instead of a chain.')
|
||||
from bonobo import Graph
|
||||
graph = Graph(graph, *chain)
|
||||
|
||||
return strategy.execute(graph, plugins=plugins)
|
||||
strategy = create_strategy(strategy)
|
||||
plugins = []
|
||||
|
||||
if _is_interactive_console():
|
||||
from bonobo.ext.console import ConsoleOutputPlugin
|
||||
if ConsoleOutputPlugin not in plugins:
|
||||
plugins.append(ConsoleOutputPlugin)
|
||||
|
||||
if _is_jupyter_notebook():
|
||||
from bonobo.ext.jupyter import JupyterOutputPlugin
|
||||
if JupyterOutputPlugin not in plugins:
|
||||
plugins.append(JupyterOutputPlugin)
|
||||
|
||||
return strategy.execute(graph, plugins=plugins)
|
||||
|
||||
del sys
|
||||
del warnings
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import argparse
|
||||
|
||||
from bonobo import Graph, run
|
||||
import bonobo
|
||||
|
||||
|
||||
def execute(file, quiet=False):
|
||||
@ -22,7 +21,7 @@ def execute(file, quiet=False):
|
||||
except Exception as exc:
|
||||
raise
|
||||
|
||||
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph))
|
||||
graphs = dict((k, v) for k, v in context.items() if isinstance(v, bonobo.Graph))
|
||||
|
||||
assert len(graphs) == 1, (
|
||||
'Having zero or more than one graph definition in one file is unsupported for now, '
|
||||
@ -34,7 +33,7 @@ def execute(file, quiet=False):
|
||||
# todo if console and not quiet, then add the console plugin
|
||||
# todo when better console plugin, add it if console and just disable display
|
||||
|
||||
return run(graph)
|
||||
return bonobo.run(graph)
|
||||
|
||||
|
||||
def register(parser):
|
||||
|
||||
@ -76,7 +76,7 @@ class GraphExecutionContext:
|
||||
def ensure_tuple(tuple_or_mixed):
|
||||
if isinstance(tuple_or_mixed, tuple):
|
||||
return tuple_or_mixed
|
||||
return (tuple_or_mixed, )
|
||||
return (tuple_or_mixed,)
|
||||
|
||||
|
||||
class LoopingExecutionContext(Wrapper):
|
||||
@ -170,16 +170,24 @@ class LoopingExecutionContext(Wrapper):
|
||||
|
||||
|
||||
class PluginExecutionContext(LoopingExecutionContext):
|
||||
PERIOD = 0.5
|
||||
|
||||
def __init__(self, wrapped, parent):
|
||||
LoopingExecutionContext.__init__(self, wrapped, parent)
|
||||
# Instanciate plugin. This is not yet considered stable, as at some point we may need a way to configure
|
||||
# plugins, for example if it depends on an external service.
|
||||
super().__init__(wrapped(self), parent)
|
||||
|
||||
def shutdown(self):
|
||||
self.wrapped.finalize(self)
|
||||
self.alive = False
|
||||
try:
|
||||
self.wrapped.finalize()
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self.handle_error(exc, traceback.format_exc())
|
||||
finally:
|
||||
self.alive = False
|
||||
|
||||
def step(self):
|
||||
try:
|
||||
self.wrapped.run(self)
|
||||
self.wrapped.run()
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self.handle_error(exc, traceback.format_exc())
|
||||
|
||||
|
||||
@ -22,7 +22,6 @@ class ExecutorStrategy(Strategy):
|
||||
|
||||
def execute(self, graph, *args, plugins=None, **kwargs):
|
||||
context = self.create_graph_execution_context(graph, plugins=plugins)
|
||||
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
executor = self.create_executor()
|
||||
@ -30,16 +29,13 @@ class ExecutorStrategy(Strategy):
|
||||
futures = []
|
||||
|
||||
for plugin_context in context.plugins:
|
||||
|
||||
def _runner(plugin_context=plugin_context):
|
||||
plugin_context.start()
|
||||
plugin_context.loop()
|
||||
plugin_context.stop()
|
||||
|
||||
futures.append(executor.submit(_runner))
|
||||
|
||||
for node_context in context.nodes:
|
||||
|
||||
def _runner(node_context=node_context):
|
||||
node_context.start()
|
||||
node_context.loop()
|
||||
@ -67,6 +63,7 @@ class ProcessPoolExecutorStrategy(ExecutorStrategy):
|
||||
|
||||
class ThreadCollectionStrategy(Strategy):
|
||||
def execute(self, graph, *args, plugins=None, **kwargs):
|
||||
print(type(self), 'execute', graph, args, plugins, kwargs)
|
||||
context = self.create_graph_execution_context(graph, plugins=plugins)
|
||||
context.recv(BEGIN, Bag(), END)
|
||||
|
||||
|
||||
@ -1,19 +1,11 @@
|
||||
from bonobo import Graph, ThreadPoolExecutorStrategy
|
||||
|
||||
|
||||
def yield_from(*args):
|
||||
yield from args
|
||||
|
||||
import bonobo
|
||||
|
||||
# Represent our data processor as a simple directed graph of callables.
|
||||
graph = Graph(
|
||||
lambda: (x for x in ('foo', 'bar', 'baz')),
|
||||
graph = bonobo.Graph(
|
||||
['foo', 'bar', 'baz'],
|
||||
str.upper,
|
||||
print,
|
||||
)
|
||||
|
||||
# Use a thread pool.
|
||||
executor = ThreadPoolExecutorStrategy()
|
||||
|
||||
# Run the thing.
|
||||
executor.execute(graph)
|
||||
if __name__ == '__main__':
|
||||
bonobo.run(graph)
|
||||
|
||||
@ -14,10 +14,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
from functools import lru_cache
|
||||
import blessings
|
||||
|
||||
import blessings
|
||||
import os
|
||||
import psutil
|
||||
|
||||
from bonobo.core.plugins import Plugin
|
||||
@ -27,7 +28,7 @@ from bonobo.util import terminal as t
|
||||
@lru_cache(1)
|
||||
def memory_usage():
|
||||
process = psutil.Process(os.getpid())
|
||||
return process.get_memory_info()[0] / float(2**20)
|
||||
return process.get_memory_info()[0] / float(2 ** 20)
|
||||
|
||||
|
||||
# @lru_cache(64)
|
||||
@ -46,10 +47,11 @@ class ConsoleOutputPlugin(Plugin):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, prefix=''):
|
||||
self.prefix = prefix
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
self.prefix = ''
|
||||
|
||||
def _write(self, context, rewind):
|
||||
def _write(self, graph_context, rewind):
|
||||
profile, debug = False, False
|
||||
if profile:
|
||||
append = (
|
||||
@ -58,18 +60,16 @@ class ConsoleOutputPlugin(Plugin):
|
||||
)
|
||||
else:
|
||||
append = ()
|
||||
self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
|
||||
self.write(graph_context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
|
||||
|
||||
# self.widget.value = [repr(component) for component in context.parent.components]
|
||||
|
||||
def run(self, context):
|
||||
if t.is_a_tty:
|
||||
self._write(context.parent, rewind=True)
|
||||
def run(self):
|
||||
if sys.stdout.isatty():
|
||||
self._write(self.context.parent, rewind=True)
|
||||
else:
|
||||
pass # not a tty
|
||||
|
||||
def finalize(self, context):
|
||||
self._write(context.parent, rewind=False)
|
||||
def finalize(self):
|
||||
self._write(self.context.parent, rewind=False)
|
||||
|
||||
@staticmethod
|
||||
def write(context, prefix='', rewind=True, append=None, debug=False, profile=False):
|
||||
|
||||
@ -1 +0,0 @@
|
||||
|
||||
1
bonobo/ext/jupyter/js/dist/index.js
vendored
1
bonobo/ext/jupyter/js/dist/index.js
vendored
@ -70,7 +70,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
|
||||
// When serialiazing entire widget state for embedding, only values different from the
|
||||
// defaults will be specified.
|
||||
|
||||
|
||||
var BonoboModel = widgets.DOMWidgetModel.extend({
|
||||
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
|
||||
_model_name: 'BonoboModel',
|
||||
|
||||
2
bonobo/ext/jupyter/js/dist/index.js.map
vendored
2
bonobo/ext/jupyter/js/dist/index.js.map
vendored
File diff suppressed because one or more lines are too long
@ -9,7 +9,6 @@ var _ = require('underscore');
|
||||
// When serialiazing entire widget state for embedding, only values different from the
|
||||
// defaults will be specified.
|
||||
|
||||
|
||||
var BonoboModel = widgets.DOMWidgetModel.extend({
|
||||
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
|
||||
_model_name: 'BonoboModel',
|
||||
|
||||
@ -14,11 +14,22 @@ except ImportError as e:
|
||||
|
||||
|
||||
class JupyterOutputPlugin(Plugin):
|
||||
def initialize(self, context):
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
self.widget = BonoboWidget()
|
||||
IPython.core.display.display(self.widget)
|
||||
|
||||
def run(self, context):
|
||||
self.widget.value = [repr(component) for component in context.parent.components]
|
||||
def run(self):
|
||||
self.widget.value = [repr(node) for node in self.context.parent.nodes]
|
||||
|
||||
finalize = run
|
||||
|
||||
|
||||
"""
|
||||
TODO JUPYTER WIDGET
|
||||
###################
|
||||
|
||||
# close the widget? what does it do?
|
||||
https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Basics.html#Closing-widgets
|
||||
|
||||
"""
|
||||
|
||||
@ -73,7 +73,6 @@ define(["jupyter-js-widgets"], function(__WEBPACK_EXTERNAL_MODULE_2__) { return
|
||||
// When serialiazing entire widget state for embedding, only values different from the
|
||||
// defaults will be specified.
|
||||
|
||||
|
||||
var BonoboModel = widgets.DOMWidgetModel.extend({
|
||||
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
|
||||
_model_name: 'BonoboModel',
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -20,11 +20,12 @@ class OpenDataSoftAPI(Configurable):
|
||||
netloc = Option(str, default='data.opendatasoft.com')
|
||||
path = Option(path_str, default='/api/records/1.0/search/')
|
||||
rows = Option(int, default=100)
|
||||
timezone = Option(str, default='Europe/Paris')
|
||||
kwargs = Option(dict, default=dict)
|
||||
|
||||
@ContextProcessor
|
||||
def compute_path(self, context):
|
||||
params = (('dataset', self.dataset), ('rows', self.rows), ) + tuple(sorted(self.kwargs.items()))
|
||||
params = (('dataset', self.dataset), ('rows', self.rows), ('timezone', self.timezone)) + tuple(sorted(self.kwargs.items()))
|
||||
yield self.endpoint.format(scheme=self.scheme, netloc=self.netloc, path=self.path) + '?' + urlencode(params)
|
||||
|
||||
@ContextProcessor
|
||||
|
||||
Reference in New Issue
Block a user