basic reimplementation of what was working in rdc.etl, jupyter widget, json import, demo with opendatasoft api, etc. Lot of things are still work in progress, stay tuned.

This commit is contained in:
Romain Dorgueil
2016-12-24 10:37:53 +01:00
parent f0315936d3
commit c30048f1b0
54 changed files with 4680 additions and 256 deletions

1
.gitignore vendored
View File

@ -23,6 +23,7 @@
.webassets-cache
/.idea
/bonobo.iml
/bonobo/ext/jupyter/js/node_modules/
/build/
/coverage.xml
/develop-eggs/

View File

@ -19,7 +19,15 @@ enable_features = {
'python',
}
install_requires = [
'psutil >=5.0,<5.1',
]
extras_require = {
'jupyter': [
'jupyter >=1.0,<1.1',
'ipywidgets >=6.0.0.beta5'
],
'dev': [
'coverage >=4.2,<4.3',
'mock >=2.0,<2.1',
@ -32,3 +40,11 @@ extras_require = {
],
}
data_files = [
('share/jupyter/nbextensions/bonobo-jupyter', [
'bonobo/ext/jupyter/static/extension.js',
'bonobo/ext/jupyter/static/index.js',
'bonobo/ext/jupyter/static/index.js.map',
]),
]

View File

@ -1,4 +1,15 @@
__title__ = 'bonobo'
__version__ = '0.0.0'
__author__ = 'Romain Dorgueil'
__license__ = 'Apache 2.0'
import sys
from .core import Graph, NaiveStrategy, ProcessPoolExecutorStrategy, ThreadPoolExecutorStrategy, inject, service
PY35 = (sys.version_info >= (3, 5))
assert PY35, 'Python 3.5+ is required to use Bonobo.'
__all__ = [
Graph,
NaiveStrategy,
ProcessPoolExecutorStrategy,
ThreadPoolExecutorStrategy,
inject,
service,
]

View File

@ -0,0 +1,13 @@
from .graphs import Graph
from .services import inject, service
from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
from .strategies.naive import NaiveStrategy
__all__ = [
Graph,
NaiveStrategy,
ProcessPoolExecutorStrategy,
ThreadPoolExecutorStrategy,
inject,
service,
]

206
bonobo/core/contexts.py Normal file
View File

@ -0,0 +1,206 @@
import traceback
import types
from functools import partial
from queue import Empty
from time import sleep
from bonobo.core.errors import InactiveReadableError
from bonobo.core.inputs import Input
from bonobo.core.stats import WithStatistics
from bonobo.util.lifecycle import get_initializer, get_finalizer
from bonobo.util.time import Timer
from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED
class ExecutionContext:
def __init__(self, graph, plugins=None):
self.graph = graph
self.components = [ComponentExecutionContext(component, self) for component in self.graph.components]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
for i, component_context in enumerate(self):
try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError as e:
continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
def __getitem__(self, item):
return self.components[item]
def __len__(self):
return len(self.components)
def __iter__(self):
yield from self.components
@property
def running(self):
return any(component.running for component in self.components)
class PluginExecutionContext:
def __init__(self, plugin, parent):
self.parent = parent
self.plugin = plugin
self.alive = True
def run(self):
try:
get_initializer(self.plugin)(self)
except Exception as e:
print('error in initializer', type(e), e)
while self.alive:
# todo with wrap_errors ....
try:
self.plugin.run(self)
except Exception as e:
print('error', type(e), e)
sleep(0.25)
try:
get_finalizer(self.plugin)(self)
except Exception as e:
print('error in finalizer', type(e), e)
def shutdown(self):
self.alive = False
class ComponentExecutionContext(WithStatistics):
"""
todo: make the counter dependant of parent context?
"""
@property
def name(self):
return self.component.__name__
@property
def running(self):
return self.input.alive
def __init__(self, component, parent):
self.parent = parent
self.component = component
self.input = Input()
self.outputs = []
self.state = NEW
self.stats = {
'in': 0,
'out': 0,
'err': 0,
'read': 0,
'write': 0,
}
def __repr__(self):
"""Adds "alive" information to the transform representation."""
return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.get_stats_as_string()
def get_stats(self, *args, **kwargs):
return (
('in', self.stats['in'],),
('out', self.stats['out'],),
('err', self.stats['err'],),
)
def impulse(self):
self.input.put(None)
def send(self, value, _control=False):
if not _control:
self.stats['out'] += 1
for output in self.outputs:
output.put(value)
def recv(self, value):
self.input.put(value)
def get(self):
row = self.input.get(timeout=1)
return row
def _call(self, row):
# timer = Timer()
# with timer:
args = () if row is None else (row,)
if getattr(self.component, '_with_context', False):
return self.component(self, *args)
return self.component(*args)
def step(self, finalize=False):
# Pull data from the first available input channel.
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
row = self.get()
self.stats['in'] += 1
results = self._call(row)
# self._exec_time += timer.duration
# Put data onto output channels
if isinstance(results, types.GeneratorType):
while True:
# timer = Timer()
# with timer:
# todo _next ?
try:
result = next(results)
except StopIteration as e:
break
# self._exec_time += timer.duration
# self._exec_count += 1
self.send(result)
elif results is not None:
# self._exec_count += 1
self.send(results)
else:
pass
# self._exec_count += 1
def run(self):
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '
'beginning of a run().').format(type(self).__name__, NEW)
self.state = RUNNING
try:
get_initializer(self.component)(self)
except Exception as e:
self.handle_error(e, traceback.format_exc())
while True:
try:
self.step()
except KeyboardInterrupt as e:
raise
except InactiveReadableError as e:
sleep(1)
# Terminated, exit loop.
break # BREAK !!!
except Empty as e:
continue
except Exception as e:
self.handle_error(e, traceback.format_exc())
assert self.state is RUNNING, ('A {} must be in {} state when finalization starts.').format(
type(self).__name__, RUNNING)
self.state = TERMINATED
try:
get_finalizer(self.component)(self)
except Exception as e:
self.handle_error(e, traceback.format_exc())
def handle_error(self, exc, tb):
self.stats['err'] += 1
print('\U0001F4A3 {} in {}'.format(type(exc).__name__, self.component))
print(tb)

View File

@ -1,4 +1,4 @@
from bonobo.core.tokens import BEGIN
from bonobo.util.tokens import BEGIN
class Graph:

View File

@ -18,7 +18,8 @@ from abc import ABCMeta, abstractmethod
from queue import Queue
from bonobo.core.errors import AbstractError, InactiveWritableError, InactiveReadableError
from bonobo.core.tokens import BEGIN, END
from bonobo.util import noop
from bonobo.util.tokens import BEGIN, END
BUFFER_SIZE = 8192
@ -47,12 +48,18 @@ class Input(Queue, Readable, Writable):
self._runlevel = 0
self._writable_runlevel = 0
self.on_begin = noop
self.on_end = noop
def put(self, data, block=True, timeout=None):
# Begin token is a metadata to raise the input runlevel.
if data == BEGIN:
self._runlevel += 1
self._writable_runlevel += 1
# callback
self.on_begin()
return
# Check we are actually able to receive data.
@ -72,6 +79,10 @@ class Input(Queue, Readable, Writable):
if data == END:
self._runlevel -= 1
# callback
self.on_end()
if not self.alive:
raise InactiveReadableError(
'Cannot get() on an inactive {} (runlevel just reached 0).'.format(Readable.__name__))

60
bonobo/core/services.py Normal file
View File

@ -0,0 +1,60 @@
import functools
import itertools
from functools import partial
class service:
def __init__(self, factory):
self.factory = factory
self.instance = None
# self.__call__ = functools.wraps(self.__call__)
self.children = set()
def __call__(self, *args, **kwargs):
if self.instance is None:
self.instance = self.factory(*args, **kwargs)
return self.instance
def __getitem__(self, item):
if item not in self.children:
raise KeyError(item)
return item
def define(self, *args, **kwargs):
new_service = type(self)(
partial(self.factory, *args, **kwargs)
)
self.children.add(new_service)
return new_service
call = lambda s: s()
def resolve(func):
return func()
def inject(*iargs, **ikwargs):
"""
Inject service dependencies.
TODO: ikwargs are ignored, implement that
"""
def wrapper(target):
@functools.wraps(target)
def wrapped(*args, **kwargs):
return target(
*itertools.chain(map(resolve, iargs), args),
**{
**kwargs,
**{k: resolve(v) for k, v in ikwargs.items()}
}
)
return wrapped
return wrapper

View File

View File

@ -0,0 +1,15 @@
from bonobo.core.contexts import ExecutionContext
class Strategy:
"""
Base class for execution strategies.
"""
context_type = ExecutionContext
def create_context(self, graph, *args, **kwargs):
return self.context_type(graph, *args, **kwargs)
def execute(self, graph, *args, **kwargs):
raise NotImplementedError

View File

@ -0,0 +1,52 @@
import time
from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
class ExecutorStrategy(Strategy):
"""
Strategy based on a concurrent.futures.Executor subclass (or similar interface).
"""
executor_factory = Executor
def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_context(graph, plugins=plugins)
executor = self.executor_factory()
for i in graph.outputs_of(BEGIN):
context[i].recv(BEGIN)
context[i].recv(None)
context[i].recv(END)
futures = []
for plugin_context in context.plugins:
futures.append(executor.submit(plugin_context.run))
for component_context in context.components:
futures.append(executor.submit(component_context.run))
while context.running:
time.sleep(0.2)
for plugin_context in context.plugins:
plugin_context.shutdown()
executor.shutdown()
#for component_context in context.components:
# print(component_context)
class ThreadPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ThreadPoolExecutor
class ProcessPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ProcessPoolExecutor

View File

@ -0,0 +1,21 @@
from queue import Queue, Empty
from bonobo.core.strategies.base import Strategy
from bonobo.util.iterators import force_iterator
class NaiveStrategy(Strategy):
def execute(self, graph, *args, **kwargs):
context = self.create_context(graph)
input_queues = {i: Queue() for i in range(len(context.graph.components))}
for i, component in enumerate(context.graph.components):
while True:
try:
args = (input_queues[i].get(block=False),) if i else ()
for row in force_iterator(component(*args)):
input_queues[i + 1].put(row)
if not i:
raise Empty
except Empty:
break

View File

@ -0,0 +1,3 @@

View File

@ -1,203 +0,0 @@
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty
from bonobo.core.errors import InactiveReadableError
from bonobo.core.io import Input
from bonobo.core.tokens import BEGIN, Token
from bonobo.util.iterators import force_iterator, IntegerSequenceGenerator
NEW = Token('New')
RUNNING = Token('Running')
TERMINATED = Token('Terminated')
def noop(*args, **kwargs): pass
def get_initializer(c):
return getattr(c, 'initialize', noop)
def get_finalizer(c):
return getattr(c, 'finalize', noop)
class ComponentExecutionContext:
"""
todo: make the counter dependant of parent context?
"""
# todo clean this xxx
__thread_counter = IntegerSequenceGenerator()
@property
def name(self):
return self.component.__name__ + '-' + str(self.__thread_number)
def __init__(self, component):
# todo clean this xxx
self.__thread_number = next(self.__class__.__thread_counter)
self.component = component
self.input = Input()
self.outputs = []
self.state = NEW
def __repr__(self):
"""Adds "alive" information to the transform representation."""
return ('+' if self.running else '-') + ' ' + self.name + ' ' + self.component.get_stats_as_string()
def step(self, finalize=False):
# Pull data from the first available input channel.
row = self.input.get(timeout=1)
self.__execute_and_handle_output(self.component, row)
def run(self):
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at the '
'beginning of a run().').format(type(self).__name__, NEW)
self.state = RUNNING
get_initializer(self.component)(self)
while True:
try:
self.step()
except KeyboardInterrupt as e:
raise
except InactiveReadableError as e:
# Terminated, exit loop.
break
except Empty as e:
continue
except Exception as e:
self.handle_error(e, traceback.format_exc())
assert self.state is RUNNING, ('A {} must be in {} state when finalization starts.').format(
type(self).__name__, RUNNING)
try:
self.state = TERMINATED
get_finalizer(self.component)(self)
except Exception as e:
self.handle_error(e, traceback.format_exc())
def handle_error(self, exc, tb):
raise NotImplementedError()
if STDERR in self.transform.OUTPUT_CHANNELS:
self.transform._output.put(({
'transform': self.transform,
'exception': exc,
'traceback': tb,
}, STDERR,))
print((str(exc) + '\n\n' + tb + '\n\n\n\n'))
else:
print((str(exc) + '\n\n' + tb + '\n\n\n\n'))
# Private
def __execute_and_handle_output(self, callable, *args, **kwargs):
"""Runs a transformation callable with given args/kwargs and flush the result into the right
output channel."""
timer = Timer()
with timer:
results = callable(*args, **kwargs)
self._exec_time += timer.duration
# Put data onto output channels
if isinstance(results, types.GeneratorType):
while True:
timer = Timer()
with timer:
try:
result = next(results)
except StopIteration as e:
break
self._exec_time += timer.duration
self._exec_count += 1
self._output.put(result)
elif results is not None:
self._exec_count += 1
self._output.put(results)
else:
self._exec_count += 1
class ExecutionContext:
def __init__(self, graph):
self.graph = graph
class Strategy:
context_type = ExecutionContext
def create_context(self, graph, *args, **kwargs):
return self.context_type(graph)
def execute(self, graph, *args, **kwargs):
raise NotImplementedError
class NaiveStrategy(Strategy):
def execute(self, graph, *args, **kwargs):
context = self.create_context(graph)
input_queues = {i: Queue() for i in range(len(context.graph.components))}
for i, component in enumerate(context.graph.components):
while True:
try:
args = (input_queues[i].get(block=False),) if i else ()
for row in force_iterator(component(*args)):
input_queues[i + 1].put(row)
if not i:
raise Empty
except Empty:
break
class ExecutorStrategy(Strategy):
executor_type = ThreadPoolExecutor
def __init__(self, executor=None):
self.executor = executor or self.executor_type()
def execute(self, graph, *args, **kwargs):
context = self.create_context(graph)
for i in graph.outputs_of(BEGIN):
self.call_component(i, *args, **kwargs)
raise NotImplementedError()
while len(self.running):
# print(self.running)
time.sleep(0.1)
f = self.executor.submit(self.components[idx], *args, **kwargs)
self.running.add(f)
idx = i
@f.add_done_callback
def on_component_done(f):
nonlocal self, idx
outputs = self.outputs_of(idx)
results = force_iterator(f.result())
if results:
for result in results:
for output in outputs:
self.call_component(output, result)
self.running.remove(f)
def __run_component(self, component):
c_in = Input()
while c_in.alive:
row = c_in.get()
component(row)

0
bonobo/ext/__init__.py Normal file
View File

View File

@ -0,0 +1,7 @@
from .helpers import console_run
from .plugin import ConsoleOutputPlugin
__all__ = [
ConsoleOutputPlugin,
console_run,
]

View File

@ -0,0 +1,9 @@
from bonobo import Graph, ThreadPoolExecutorStrategy
from .plugin import ConsoleOutputPlugin
def console_run(*chain, output=True, plugins=None):
graph = Graph()
executor = ThreadPoolExecutorStrategy()
graph.add_chain(*chain)
return executor.execute(graph, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [])

View File

@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import lru_cache
import blessings
import psutil
t = blessings.Terminal()
@lru_cache(1)
def memory_usage():
process = psutil.Process(os.getpid())
return process.get_memory_info()[0] / float(2 ** 20)
# @lru_cache(64)
# def execution_time(harness):
# return datetime.datetime.now() - harness._started_at
class ConsoleOutputPlugin:
"""
Outputs status information to the connected stdout. Can be a TTY, with or without support for colors/cursor
movements, or a non tty (pipe, file, ...). The features are adapted to terminal capabilities.
.. attribute:: prefix
String prefix of output lines.
"""
def __init__(self, prefix=''):
self.prefix = prefix
def _write(self, context, rewind):
profile, debug = False, False
if profile:
append = (
('Memory', '{0:.2f} Mb'.format(memory_usage())),
# ('Total time', '{0} s'.format(execution_time(harness))),
)
else:
append = ()
self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
# self.widget.value = [repr(component) for component in context.parent.components]
def run(self, context):
if t.is_a_tty:
self._write(context.parent, rewind=True)
else:
pass # not a tty
def finalize(self, context):
self._write(context.parent, rewind=False)
@staticmethod
def write(context, prefix='', rewind=True, append=None, debug=False, profile=False):
t_cnt = len(context)
for i, component in enumerate(context):
if component.running:
_line = ''.join((
t.black('({})'.format(i+1)),
' ',
t.bold(t.white('+')),
' ',
component.name,
' ',
component.get_stats_as_string(debug=debug, profile=profile),
' ',
))
else:
_line = t.black(''.join((
'({})'.format(i+1),
' - ',
component.name,
' ',
component.get_stats_as_string(debug=debug, profile=profile),
' ',
)))
print(prefix + _line + t.clear_eol)
if append:
# todo handle multiline
print(' `->', ' '.join('{0}: {1}'.format(t.bold(t.white(k)), v) for k, v in append), t.clear_eol)
t_cnt += 1
if rewind:
print(t.clear_eol)
print(t.move_up * (t_cnt + 2))

58
bonobo/ext/couchdb_.py Normal file
View File

@ -0,0 +1,58 @@
from bonobo import inject
try:
import couchdb
except ImportError as e:
import logging
logging.exception('You must install couchdb to use the bonobo couchdb extension. Easiest way is to install the '
'optional "couchdb" dependencies with «pip install bonobo[couchdb]», but you can also install a '
'specific version by yourself.')
import datetime
from bonobo import service
@service
def client(username, password):
client = couchdb.Server()
client.resource.credentials = (username, password,)
return client
@service
@inject(client)
def database(client, name):
return client[name]
def json_datetime(dt=None):
dt = dt or datetime.datetime.now()
return dt.replace(microsecond=0).isoformat() + 'Z'
@inject(database)
def query(db, map, reduce, *args, **kwargs):
pass
cli1 = client.define('admin', 'admin')
cli2 = client.define('foo', 'bar')
@inject(client[cli1])
def print_db(db):
print(db)
@inject(client[cli2])
def print_db2(db):
print(db)
if __name__ == '__main__':
print_db()
print_db2()
print_db()
print_db2()

View File

@ -0,0 +1,18 @@
from .helpers import jupyter_run
from .plugin import JupyterOutputPlugin
def _jupyter_nbextension_paths():
return [{
'section': 'notebook',
'src': 'static',
'dest': 'bonobo-jupyter',
'require': 'bonobo-jupyter/extension'
}]
__all__ = [
JupyterOutputPlugin,
_jupyter_nbextension_paths,
jupyter_run,
]

View File

@ -0,0 +1,9 @@
from bonobo import Graph, ThreadPoolExecutorStrategy
from .plugin import JupyterOutputPlugin
def jupyter_run(*chain, plugins=None):
graph = Graph()
executor = ThreadPoolExecutorStrategy()
graph.add_chain(*chain)
return executor.execute(graph, plugins=(plugins or []) + [JupyterOutputPlugin()])

View File

@ -0,0 +1,19 @@
Bonobo integration in Jupyter
Package Install
---------------
**Prerequisites**
- [node](http://nodejs.org/)
```bash
npm install --save bonobo-jupyter
```
Watch mode (for development)
----------------------------
```bash
./node_modules/.bin/webpack --watch
``

1702
bonobo/ext/jupyter/js/dist/index.js vendored Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,29 @@
{
"name": "bonobo-jupyter",
"version": "0.0.1",
"description": "Jupyter integration for Bonobo",
"author": "",
"main": "src/index.js",
"repository": {
"type": "git",
"url": ""
},
"keywords": [
"jupyter",
"widgets",
"ipython",
"ipywidgets"
],
"scripts": {
"prepublish": "webpack",
"test": "echo \"Error: no test specified\" && exit 1"
},
"devDependencies": {
"json-loader": "^0.5.4",
"webpack": "^1.12.14"
},
"dependencies": {
"jupyter-js-widgets": "^2.0.9",
"underscore": "^1.8.3"
}
}

View File

@ -0,0 +1,42 @@
var widgets = require('jupyter-js-widgets');
var _ = require('underscore');
// Custom Model. Custom widgets models must at least provide default values
// for model attributes, including `_model_name`, `_view_name`, `_model_module`
// and `_view_module` when different from the base class.
//
// When serialiazing entire widget state for embedding, only values different from the
// defaults will be specified.
var BonoboModel = widgets.DOMWidgetModel.extend({
defaults: _.extend({}, widgets.DOMWidgetModel.prototype.defaults, {
_model_name: 'BonoboModel',
_view_name: 'BonoboView',
_model_module: 'bonobo',
_view_module: 'bonobo',
value: []
})
});
// Custom View. Renders the widget model.
var BonoboView = widgets.DOMWidgetView.extend({
render: function () {
this.value_changed();
this.model.on('change:value', this.value_changed, this);
},
value_changed: function () {
this.$el.html(
this.model.get('value').join('<br>')
);
},
});
module.exports = {
BonoboModel: BonoboModel,
BonoboView: BonoboView
};

View File

@ -0,0 +1,9 @@
// Entry point for the unpkg bundle containing custom model definitions.
//
// It differs from the notebook bundle in that it does not need to define a
// dynamic baseURL for the static assets and may load some css that would
// already be loaded by the notebook otherwise.
// Export widget models and views, and the npm package version number.
module.exports = require('./bonobo.js');
module.exports['version'] = require('../package.json').version;

View File

@ -0,0 +1,20 @@
// This file contains the javascript that is run when the notebook is loaded.
// It contains some requirejs configuration and the `load_ipython_extension`
// which is required for any notebook extension.
// Configure requirejs
if (window.require) {
window.require.config({
map: {
"*" : {
"bonobo-jupyter": "nbextensions/bonobo-jupyter/index",
"jupyter-js-widgets": "nbextensions/jupyter-js-widgets/extension"
}
}
});
}
// Export the required load_ipython_extention
module.exports = {
load_ipython_extension: function() {}
};

View File

@ -0,0 +1,12 @@
// Entry point for the notebook bundle containing custom model definitions.
//
// Setup notebook base URL
//
// Some static assets may be required by the custom widget javascript. The base
// url for the notebook is not known at build time and is therefore computed
// dynamically.
__webpack_public_path__ = document.querySelector('body').getAttribute('data-base-url') + 'nbextensions/bonobo/';
// Export widget models and views, and the npm package version number.
module.exports = require('./bonobo.js');
module.exports['version'] = require('../package.json').version;

View File

@ -0,0 +1,74 @@
var version = require('./package.json').version;
// Custom webpack loaders are generally the same for all webpack bundles, hence
// stored in a separate local variable.
var loaders = [
{test: /\.json$/, loader: 'json-loader'},
];
module.exports = [
{
// Notebook extension
//
// This bundle only contains the part of the JavaScript that is run on
// load of the notebook. This section generally only performs
// some configuration for requirejs, and provides the legacy
// "load_ipython_extension" function which is required for any notebook
// extension.
//
entry: './src/extension.js',
output: {
filename: 'extension.js',
path: '../static',
libraryTarget: 'amd'
}
},
{
// Bundle for the notebook containing the custom widget views and models
//
// This bundle contains the implementation for the custom widget views and
// custom widget.
// It must be an amd module
//
entry: './src/index.js',
output: {
filename: 'index.js',
path: '../static',
libraryTarget: 'amd'
},
devtool: 'source-map',
module: {
loaders: loaders
},
externals: ['jupyter-js-widgets']
},
{
// Embeddable jupyter-widget-example bundle
//
// This bundle is generally almost identical to the notebook bundle
// containing the custom widget views and models.
//
// The only difference is in the configuration of the webpack public path
// for the static assets.
//
// It will be automatically distributed by unpkg to work with the static
// widget embedder.
//
// The target bundle is always `dist/index.js`, which is the path required
// by the custom widget embedder.
//
entry: './src/embed.js',
output: {
filename: 'index.js',
path: './dist/',
libraryTarget: 'amd',
publicPath: 'https://unpkg.com/jupyter-widget-example@' + version + '/dist/'
},
devtool: 'source-map',
module: {
loaders: loaders
},
externals: ['jupyter-js-widgets']
}
];

View File

@ -0,0 +1,14 @@
from IPython.core.display import display
from bonobo.ext.jupyter.widget import BonoboWidget
class JupyterOutputPlugin:
def initialize(self, context):
self.widget = BonoboWidget()
display(self.widget)
def run(self, context):
self.widget.value = [repr(component) for component in context.parent.components]
finalize = run

View File

@ -0,0 +1,70 @@
define(function() { return /******/ (function(modules) { // webpackBootstrap
/******/ // The module cache
/******/ var installedModules = {};
/******/ // The require function
/******/ function __webpack_require__(moduleId) {
/******/ // Check if module is in cache
/******/ if(installedModules[moduleId])
/******/ return installedModules[moduleId].exports;
/******/ // Create a new module (and put it into the cache)
/******/ var module = installedModules[moduleId] = {
/******/ exports: {},
/******/ id: moduleId,
/******/ loaded: false
/******/ };
/******/ // Execute the module function
/******/ modules[moduleId].call(module.exports, module, module.exports, __webpack_require__);
/******/ // Flag the module as loaded
/******/ module.loaded = true;
/******/ // Return the exports of the module
/******/ return module.exports;
/******/ }
/******/ // expose the modules object (__webpack_modules__)
/******/ __webpack_require__.m = modules;
/******/ // expose the module cache
/******/ __webpack_require__.c = installedModules;
/******/ // __webpack_public_path__
/******/ __webpack_require__.p = "";
/******/ // Load entry module and return exports
/******/ return __webpack_require__(0);
/******/ })
/************************************************************************/
/******/ ([
/* 0 */
/***/ function(module, exports) {
// This file contains the javascript that is run when the notebook is loaded.
// It contains some requirejs configuration and the `load_ipython_extension`
// which is required for any notebook extension.
// Configure requirejs
if (window.require) {
window.require.config({
map: {
"*" : {
"bonobo-jupyter": "nbextensions/bonobo-jupyter/index",
"jupyter-js-widgets": "nbextensions/jupyter-js-widgets/extension"
}
}
});
}
// Export the required load_ipython_extention
module.exports = {
load_ipython_extension: function() {}
};
/***/ }
/******/ ])});;

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,11 @@
import ipywidgets as widgets
from traitlets import List, Unicode
@widgets.register('bonobo-widget.Bonobo')
class BonoboWidget(widgets.DOMWidget):
_view_name = Unicode('BonoboView').tag(sync=True)
_model_name = Unicode('BonoboModel').tag(sync=True)
_view_module = Unicode('bonobo-jupyter').tag(sync=True)
_model_module = Unicode('bonobo-jupyter').tag(sync=True)
value = List().tag(sync=True)

29
bonobo/ext/ods.py Normal file
View File

@ -0,0 +1,29 @@
from urllib.parse import urlencode
import requests # todo: make this a service so we can substitute it ?
def extract_ods(url, dataset, rows=100, **kwargs):
params = (('dataset', dataset), ('rows', rows),) + tuple(sorted(kwargs.items()))
base_url = url + '?' + urlencode(params)
def _extract_ods():
nonlocal base_url, rows
start = 0
while True:
resp = requests.get('{}&start={start}'.format(base_url, start=start))
records = resp.json().get('records', [])
if not len(records):
break
for row in records:
yield {
**row.get('fields', {}),
'geometry': row.get('geometry', {})
}
start += rows
_extract_ods.__name__ = 'extract_' + dataset.replace('-', '_')
return _extract_ods

0
bonobo/ext/pandas.py Normal file
View File

38
bonobo/ext/selenium.py Normal file
View File

@ -0,0 +1,38 @@
from selenium import webdriver
from bonobo import service
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/601.4.4 (KHTML, like Gecko) Version/9.0.3 Safari/601.4.4'
def create_profile(use_tor=False):
profile = webdriver.FirefoxProfile()
profile.set_preference("toolkit.startup.max_resumed_crashes", "-1")
if use_tor:
# tor connection
profile.set_preference('network.proxy.type', 1)
profile.set_preference('network.proxy.socks', '127.0.0.1')
profile.set_preference('network.proxy.socks_port', 9050)
# user agent
profile.set_preference("general.useragent.override", USER_AGENT)
return profile
def create_browser(profile):
browser = webdriver.Firefox(profile)
browser.implicitly_wait(10)
browser.set_page_load_timeout(10)
return browser
@service
def browser():
return create_browser(create_profile(use_tor=False))
@service
def torbrowser():
return create_browser(create_profile(use_tor=True))

0
bonobo/io/__init__.py Normal file
View File

36
bonobo/io/json.py Normal file
View File

@ -0,0 +1,36 @@
import json
from bonobo.util.lifecycle import with_context, set_initializer, set_finalizer
def to_json(path_or_buf):
# todo different cases + documentation
# case 1: path_or_buf is str, we consider it filename, open and write
# case 2: pob is None, json should be yielded
# case 3: pob is stream, filelike, write, gogog.
@with_context
def _to_json(ctx, row):
if ctx.first:
prefix = ''
ctx.first = False
else:
prefix = ',\n'
ctx.fp.write(prefix + json.dumps(row))
@set_initializer(_to_json)
def _to_json_initialize(ctx):
assert not hasattr(ctx, 'fp'), 'One at a time, baby.'
ctx.fp = open(path_or_buf, 'w+')
ctx.fp.write('[\n')
ctx.first = True
@set_finalizer(_to_json)
def _to_json_finalize(ctx):
ctx.fp.write('\n]')
ctx.fp.close()
del ctx.fp, ctx.first
_to_json.__name__ = 'to_json'
return _to_json

View File

@ -0,0 +1,32 @@
import functools
import pprint
def head(n=10):
i = 0
def _head(x):
nonlocal i, n
i += 1
if i <= n:
yield x
_head.__name__ = 'head({})'.format(n)
return _head
def tee(f):
@functools.wraps(f)
def wrapped(x):
nonlocal f
f(x)
return x
return wrapped
log = tee(pprint.pprint)
def noop(*args, **kwargs):
pass

24
bonobo/util/compat.py Normal file
View File

@ -0,0 +1,24 @@
import struct
import sys
def is_platform_little_endian():
""" am I little endian """
return sys.byteorder == 'little'
def is_platform_windows():
return sys.platform == 'win32' or sys.platform == 'cygwin'
def is_platform_linux():
return sys.platform == 'linux2'
def is_platform_mac():
return sys.platform == 'darwin'
def is_platform_32bit():
return struct.calcsize("P") * 8 < 64

View File

@ -1,17 +1,3 @@
class IntegerSequenceGenerator:
"""Simple integer sequence generator."""
def __init__(self):
self.current = 0
def get(self):
return self.current
def __next__(self):
self.current += 1
return self.current
def force_iterator(x):
if isinstance(x, str):
return [x]

30
bonobo/util/lifecycle.py Normal file
View File

@ -0,0 +1,30 @@
from bonobo.util import noop
def _create_lifecycle_functions(noun, verb):
getter = lambda c: getattr(c, verb, noop)
getter.__name__ = 'get_' + noun
def setter(f):
nonlocal noun, verb
assert callable(f), 'You must provide a callable to decorate with {}.'.format(noun)
def wrapper(c):
nonlocal verb, f
setattr(f, verb, c)
return f
return wrapper
setter.__name__ = 'set_' + noun
return getter, setter
get_initializer, set_initializer = _create_lifecycle_functions('initializer', 'initialize')
get_finalizer, set_finalizer = _create_lifecycle_functions('finalizer', 'finalize')
def with_context(f):
f._with_context = True
return f

21
bonobo/util/time.py Normal file
View File

@ -0,0 +1,21 @@
import time
class Timer(object):
"""
Context manager used to time execution of stuff.
"""
def __enter__(self):
self.__start = time.time()
def __exit__(self, type=None, value=None, traceback=None):
# Error handling here
self.__finish = time.time()
@property
def duration(self):
return self.__finish - self.__start
def __str__(self):
return str(int(self.duration * 1000) / 1000.0) + 's'

View File

@ -10,3 +10,6 @@ class Token:
BEGIN = Token('Begin')
END = Token('End')
NEW = Token('New')
RUNNING = Token('Running')
TERMINATED = Token('Terminated')

View File

@ -1,5 +1,9 @@
from bonobo.core.graph import Graph
from bonobo.core.strategy import NaiveStrategy, ExecutorStrategy
import time
from random import randint
from bonobo.core.graphs import Graph
from bonobo.core.strategies.executor import ThreadPoolExecutorStrategy
from bonobo.ext.console import ConsoleOutputPlugin
def extract():
@ -9,19 +13,20 @@ def extract():
def transform(s):
return s.title()
wait = randint(0, 1)
time.sleep(wait)
return s.title() + ' ' + str(wait)
def load(s):
print(s)
Strategy = ThreadPoolExecutorStrategy
if __name__ == '__main__':
etl = Graph()
etl.add_chain(extract, transform, load)
s = NaiveStrategy()
s.execute(etl)
s = ExecutorStrategy()
s.execute(etl)
s = Strategy()
s.execute(etl, plugins=[ConsoleOutputPlugin()])

View File

@ -0,0 +1,60 @@
import json
from blessings import Terminal
from pycountry import countries
from bonobo.ext.console import console_run
from bonobo.ext.ods import extract_ods
from bonobo.util import tee
from bonobo.io.json import to_json
DATASET = 'fablabs-in-the-world'
SEARCH_URL = 'https://datanova.laposte.fr/api/records/1.0/search/'
URL = SEARCH_URL + '?dataset=' + DATASET
ROWS = 100
t = Terminal()
def _getlink(x):
return x.get('url', None)
def normalize(row):
result = {
**row,
'links': list(filter(None, map(_getlink, json.loads(row.get('links'))))),
'country': countries.get(alpha_2=row.get('country_code', '').upper()).name,
}
return result
def filter_france(row):
if row.get('country') == 'France':
yield row
def display(row):
print(t.bold(row.get('name')))
address = list(filter(None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))),
row.get('county', None),
row.get('country'),
)))
print(' - {}: {address}'.format(t.blue('address'), address=', '.join(address)))
print(' - {}: {links}'.format(t.blue('links'), links=', '.join(row['links'])))
print(' - {}: {geometry}'.format(t.blue('geometry'), **row))
print(' - {}: {source}'.format(t.blue('source'), source='datanova/' + DATASET))
if __name__ == '__main__':
console_run(
extract_ods(SEARCH_URL, DATASET, timezone='Europe/Paris'),
normalize,
filter_france,
tee(display),
to_json('fablabs.json'),
output=True,
)

View File

@ -1,2 +1,5 @@
[bdist_wheel]
universal=1
[metadata]
description-file = README.rst

View File

@ -5,11 +5,13 @@ from setuptools import setup, find_packages
tolines = lambda c: list(filter(None, map(lambda s: s.strip(), c.split('\n'))))
def read(filename, flt=None):
with open(filename) as f:
content = f.read().strip()
return flt(content) if callable(flt) else content
try:
version = read('version.txt')
except:
@ -32,7 +34,16 @@ setup(
'pytest >=3,<4',
'pytest-cov >=2.4,<2.5',
'sphinx',
'sphinx_rtd_theme']},
'sphinx_rtd_theme'],
'jupyter': ['ipywidgets >=6.0.0.beta5']
},
data_files=[
('share/jupyter/nbextensions/bonobo', [
'bonobo/ext/jupyter/static/extension.js',
'bonobo/ext/jupyter/static/index.js',
'bonobo/ext/jupyter/static/index.js.map',
]),
],
url='https://github.com/hartym/bonobo',
download_url='https://github.com/hartym/bonobo'.format(version=version),
)

View File

@ -1,7 +1,7 @@
import pytest
from bonobo.core.graph import Graph
from bonobo.core.tokens import BEGIN
from bonobo.core.graphs import Graph
from bonobo.util.tokens import BEGIN
identity = lambda x: x

View File

@ -19,8 +19,8 @@ from queue import Empty
import pytest
from bonobo.core.errors import InactiveWritableError, InactiveReadableError
from bonobo.core.io import Input
from bonobo.core.tokens import BEGIN, END
from bonobo.core.inputs import Input
from bonobo.util.tokens import BEGIN, END
def test_input_runlevels():

View File

@ -0,0 +1,24 @@
from bonobo import inject, service
class MyFoo():
pass
def test_service_is_singleton():
@service
def foo():
return MyFoo()
assert foo() is foo()
@inject(foo)
def bar(myfoo):
assert myfoo is foo()
bar()
foo2 = foo.define()
assert type(foo()) == type(foo2())
assert foo2() is not foo()

View File

@ -1,4 +1,4 @@
from bonobo.core.tokens import Token
from bonobo.util.tokens import Token
def test_token_repr():