Merge pull request #12 from hartym/master

Major update to allow extensions.
This commit is contained in:
Romain Dorgueil
2017-01-10 23:21:09 +01:00
committed by GitHub
43 changed files with 698 additions and 266 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@
*.py[cod]
*.so
*.spec
.*.sw?
.Python
.cache
.coverage

View File

@ -1,3 +1,4 @@
[style]
based_on_style = pep8
column_limit = 120
dedent_closing_brackets = true

View File

@ -1,7 +1,7 @@
# This file has been auto-generated.
# All changes will be lost, see Projectfile.
#
# Updated at 2016-12-28 15:50:31.026587
# Updated at 2017-01-10 23:15:21.478899
PYTHON ?= $(shell which python)
PYTHON_BASENAME ?= $(shell basename $(PYTHON))
@ -24,13 +24,13 @@ YAPF_OPTIONS ?= -rip
# Installs the local project dependencies.
install: $(VIRTUAL_ENV)
if [ -z "$(QUICK)" ]; then \
$(PIP) install -Ur $(PYTHON_REQUIREMENTS_FILE) ; \
$(PIP) install -U pip wheel -r $(PYTHON_REQUIREMENTS_FILE) ; \
fi
# Installs the local project dependencies, including development-only libraries.
install-dev: $(VIRTUAL_ENV)
if [ -z "$(QUICK)" ]; then \
$(PIP) install -Ur $(PYTHON_REQUIREMENTS_DEV_FILE) ; \
$(PIP) install -U pip wheel -r $(PYTHON_REQUIREMENTS_DEV_FILE) ; \
fi
# Cleans up the local mess.
@ -41,7 +41,7 @@ clean:
# Setup the local virtualenv, or use the one provided by the current environment.
$(VIRTUAL_ENV):
virtualenv -p $(PYTHON) $(VIRTUAL_ENV)
$(PIP) install -U pip\>=8.1.2,\<9 wheel\>=0.29,\<1.0
$(PIP) install -U pip wheel
ln -fs $(VIRTUAL_ENV)/bin/activate activate-$(PYTHON_BASENAME)
lint: install-dev

View File

@ -23,6 +23,10 @@ enable_features = {
install_requires = [
'blessings >=1.6,<1.7',
'psutil >=5.0,<5.1',
'requests >=2.12,<2.13',
'stevedore >=1.19,<1.20',
'toolz >=0.8,<0.9',
]
extras_require = {
@ -31,7 +35,7 @@ extras_require = {
'ipywidgets >=6.0.0.beta5'
],
'dev': [
'coverage >=4.2,<4.3',
'coverage >=4.3,<4.4',
'mock >=2.0,<2.1',
'nose >=1.3,<1.4',
'pylint >=1.6,<1.7',
@ -51,6 +55,18 @@ data_files = [
]),
]
entry_points = {
'console_scripts': [
'bonobo = bonobo.commands:entrypoint'
],
'bonobo.commands': [
'init = bonobo.commands.init:register',
'run = bonobo.commands.run:register',
],
'edgy.project.features': [
'bonobo = bonobo.ext.edgy.project.feature:BonoboFeature'
]
}
@listen('edgy.project.feature.make.on_generate', priority=10)
def on_make_generate_docker_targets(event):

View File

@ -19,21 +19,15 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
assert (sys.version_info >= (3, 5)), 'Python 3.5+ is required to use Bonobo.'
from ._version import __version__
from .core import *
from .io import CsvReader, CsvWriter, FileReader, FileWriter, JsonReader, JsonWriter
from .util import *
PY35 = (sys.version_info >= (3, 5))
assert PY35, 'Python 3.5+ is required to use Bonobo.'
# Version infos
with open(os.path.realpath(os.path.join(os.path.dirname(__file__), '../version.txt'))) as f:
__version__ = f.read().strip()
__all__ = [
'Bag',
'CsvReader',
@ -47,12 +41,14 @@ __all__ = [
'NaiveStrategy',
'ProcessPoolExecutorStrategy',
'ThreadPoolExecutorStrategy',
'__version__',
'console_run',
'head',
'inject',
'jupyter_run',
'limit',
'log',
'noop',
'pprint',
'run',
'service',
'tee',

4
bonobo/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from bonobo.commands import entrypoint
if __name__ == '__main__':
entrypoint()

1
bonobo/_version.py Normal file
View File

@ -0,0 +1 @@
__version__ = '0.1.3'

View File

@ -0,0 +1,22 @@
import argparse
from stevedore import ExtensionManager
def entrypoint():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='command')
subparsers.required = True
def register_extension(ext):
parser = subparsers.add_parser(ext.name)
command = ext.plugin(parser)
parser.set_defaults(command=command)
mgr = ExtensionManager(namespace='bonobo.commands', )
mgr.map(register_extension)
args = parser.parse_args().__dict__
command = args.pop('command')
command(**args)

18
bonobo/commands/init.py Normal file
View File

@ -0,0 +1,18 @@
import os
def execute():
try:
import edgy.project
except ImportError as exc:
raise ImportError(
'You must install "edgy.project" to use this command.\n\n $ pip install edgy.project\n'
) from exc
from edgy.project.__main__ import handle_init
return handle_init(os.path.join(os.getcwd(), 'Projectfile'))
def register(parser):
return execute

29
bonobo/commands/run.py Normal file
View File

@ -0,0 +1,29 @@
import argparse
from bonobo import Graph, console_run
def execute(file):
with file:
code = compile(file.read(), file.name, 'exec')
context = {}
try:
exec(code, context)
except Exception as exc:
raise
graphs = dict((k, v) for k, v in context.items() if isinstance(v, Graph))
assert len(graphs) == 1, 'Having more than one graph definition in one file is unsupported for now, but it is ' \
'something that will be implemented in the future. '
name, graph = list(graphs.items())[0]
return console_run(graph)
def register(parser):
parser.add_argument('file', type=argparse.FileType())
return execute

View File

@ -1,6 +1,6 @@
""" Core required libraries. """
from .bags import Bag
from .bags import Bag, ErrorBag
from .graphs import Graph
from .services import inject, service
from .strategies.executor import ThreadPoolExecutorStrategy, ProcessPoolExecutorStrategy
@ -8,6 +8,7 @@ from .strategies.naive import NaiveStrategy
__all__ = [
'Bag',
'ErrorBag',
'Graph',
'NaiveStrategy',
'ProcessPoolExecutorStrategy',

View File

@ -2,6 +2,11 @@ import itertools
from bonobo.util.tokens import Token
__all__ = [
'Bag',
'ErrorBag',
]
INHERIT_INPUT = Token('InheritInput')
@ -18,7 +23,8 @@ class Bag:
return self._args
return (
*self._parent.args,
*self._args, )
*self._args,
)
@property
def kwargs(self):
@ -34,7 +40,24 @@ class Bag:
return self._flags
def apply(self, func_or_iter, *args, **kwargs):
return func_or_iter(*args, *self.args, **kwargs, **self.kwargs)
if callable(func_or_iter):
return func_or_iter(*args, *self.args, **kwargs, **self.kwargs)
if len(args) == 0 and len(kwargs) == 0:
try:
iter(func_or_iter)
def generator():
nonlocal func_or_iter
for x in func_or_iter:
yield x
return generator()
except TypeError as exc:
print('nop')
raise TypeError('Could not apply bag to {}.'.format(func_or_iter)) from exc
raise TypeError('Could not apply bag to {}.'.format(func_or_iter))
def extend(self, *args, **kwargs):
return type(self)(*args, _parent=self, **kwargs)
@ -48,7 +71,13 @@ class Bag:
def __repr__(self):
return '<{} ({})>'.format(
type(self).__name__, ', '.join(
itertools.chain(
map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()), )))
type(self).__name__, ', '.
join(itertools.chain(
map(repr, self.args),
('{}={}'.format(k, repr(v)) for k, v in self.kwargs.items()),
))
)
class ErrorBag(Bag):
pass

View File

@ -3,7 +3,7 @@ from functools import partial
from queue import Empty
from time import sleep
from bonobo.core.bags import Bag, INHERIT_INPUT
from bonobo.core.bags import Bag, INHERIT_INPUT, ErrorBag
from bonobo.core.errors import InactiveReadableError
from bonobo.core.inputs import Input
from bonobo.core.stats import WithStatistics
@ -11,11 +11,31 @@ from bonobo.util.lifecycle import get_initializer, get_finalizer
from bonobo.util.tokens import BEGIN, END, NEW, RUNNING, TERMINATED, NOT_MODIFIED
def get_name(mixed):
try:
return mixed.__name__
except AttributeError:
return type(mixed).__name__
def create_component_context(component, parent):
try:
CustomComponentContext = component.Context
except AttributeError:
return ComponentExecutionContext(component, parent=parent)
if ComponentExecutionContext in CustomComponentContext.__mro__:
bases = (CustomComponentContext, )
else:
bases = (CustomComponentContext, ComponentExecutionContext)
return type(get_name(component).title() + 'ExecutionContext', bases, {})(component, parent=parent)
class ExecutionContext:
def __init__(self, graph, plugins=None):
self.graph = graph
self.components = [ComponentExecutionContext(component, self) for component in self.graph.components]
self.components = [create_component_context(component, parent=self) for component in self.graph.components]
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
for i, component_context in enumerate(self):
@ -23,8 +43,10 @@ class ExecutionContext:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError:
continue
component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.finalize)
def __getitem__(self, item):
return self.components[item]
@ -63,17 +85,19 @@ class AbstractLoopContext:
def initialize(self):
# pylint: disable=broad-except
try:
get_initializer(self.wrapped)(self)
initializer = get_initializer(self.wrapped)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
else:
return initializer(self)
def loop(self):
"""Generic loop. A bit boring. """
while self.alive:
self._loop()
self.step()
sleep(self.PERIOD)
def _loop(self):
def step(self):
"""
TODO xxx this is a step, not a loop
"""
@ -83,9 +107,11 @@ class AbstractLoopContext:
"""Generic finalizer. """
# pylint: disable=broad-except
try:
get_finalizer(self.wrapped)(self)
finalizer = get_finalizer(self.wrapped)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
return self.handle_error(exc, traceback.format_exc())
else:
return finalizer(self)
def handle_error(self, exc, trace):
"""
@ -112,7 +138,7 @@ class PluginExecutionContext(AbstractLoopContext):
def shutdown(self):
self.alive = False
def _loop(self):
def step(self):
try:
self.wrapped.run(self)
except Exception as exc: # pylint: disable=broad-except
@ -157,13 +183,17 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
return (
(
'in',
self.stats['in'], ),
self.stats['in'],
),
(
'out',
self.stats['out'], ),
self.stats['out'],
),
(
'err',
self.stats['err'], ), )
self.stats['err'],
),
)
def recv(self, *messages):
"""
@ -195,7 +225,7 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
self.stats['in'] += 1
return row
def _call(self, bag):
def apply_on(self, bag):
# todo add timer
if getattr(self.component, '_with_context', False):
return bag.apply(self.component, self)
@ -203,10 +233,29 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
def initialize(self):
# pylint: disable=broad-except
assert self.state is NEW, ('A {} can only be run once, and thus is expected to be in {} state at '
'initialization time.').format(type(self).__name__, NEW)
assert self.state is NEW, (
'A {} can only be run once, and thus is expected to be in {} state at '
'initialization time.'
).format(type(self).__name__, NEW)
self.state = RUNNING
super().initialize()
try:
initializer_outputs = super().initialize()
self.handle(None, initializer_outputs)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
def finalize(self):
# pylint: disable=broad-except
assert self.state is RUNNING, ('A {} must be in {} state at finalization time.'
).format(type(self).__name__, RUNNING)
self.state = TERMINATED
try:
finalizer_outputs = super().finalize()
self.handle(None, finalizer_outputs)
except Exception as exc:
self.handle_error(exc, traceback.format_exc())
def loop(self):
while True:
@ -228,34 +277,39 @@ class ComponentExecutionContext(WithStatistics, AbstractLoopContext):
output channel."""
input_bag = self.get()
outputs = self._call(input_bag)
outputs = self.apply_on(input_bag)
self.handle(input_bag, outputs)
def run(self):
self.initialize()
self.loop()
def handle(self, input_bag, outputs):
# self._exec_time += timer.duration
# Put data onto output channels
try:
outputs = _iter(outputs)
except TypeError:
except TypeError: # not an iterator
if outputs:
self.send(_resolve(input_bag, outputs))
if isinstance(outputs, ErrorBag):
outputs.apply(self.handle_error)
else:
self.send(_resolve(input_bag, outputs))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
pass
else:
while True:
while True: # iterator
try:
output = next(outputs)
except StopIteration:
break
self.send(_resolve(input_bag, output))
def finalize(self):
# pylint: disable=broad-except
assert self.state is RUNNING, ('A {} must be in {} state at finalization time.').format(
type(self).__name__, RUNNING)
self.state = TERMINATED
super().finalize()
else:
if isinstance(output, ErrorBag):
output.apply(self.handle_error)
else:
self.send(_resolve(input_bag, output))
def _iter(mixed):

View File

@ -20,9 +20,12 @@ class AbstractError(NotImplementedError):
"""Abstract error is a convenient error to declare a method as "being left as an exercise for the reader"."""
def __init__(self, method):
super().__init__('Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format(
class_name=method.__self__.__name__,
method_name=method.__name__, ))
super().__init__(
'Call to abstract method {class_name}.{method_name}(...): missing implementation.'.format(
class_name=method.__self__.__name__,
method_name=method.__name__,
)
)
class InactiveIOError(IOError):
@ -39,9 +42,12 @@ class InactiveWritableError(InactiveIOError):
class ValidationError(RuntimeError):
def __init__(self, inst, message):
super(ValidationError, self).__init__('Validation error in {class_name}: {message}'.format(
class_name=type(inst).__name__,
message=message, ))
super(ValidationError, self).__init__(
'Validation error in {class_name}: {message}'.format(
class_name=type(inst).__name__,
message=message,
)
)
class ProhibitedOperationError(RuntimeError):

View File

@ -48,12 +48,17 @@ class Input(Queue, Readable, Writable):
self._runlevel = 0
self._writable_runlevel = 0
self.on_initialize = noop
self.on_begin = noop
self.on_end = noop
self.on_finalize = noop
def put(self, data, block=True, timeout=None):
# Begin token is a metadata to raise the input runlevel.
if data == BEGIN:
if not self._runlevel:
self.on_initialize()
self._runlevel += 1
self._writable_runlevel += 1
@ -78,14 +83,18 @@ class Input(Queue, Readable, Writable):
data = Queue.get(self, block, timeout)
if data == END:
if self._runlevel == 1:
self.on_finalize()
self._runlevel -= 1
# callback
self.on_end()
if not self.alive:
raise InactiveReadableError('Cannot get() on an inactive {} (runlevel just reached 0).'.format(
Readable.__name__))
raise InactiveReadableError(
'Cannot get() on an inactive {} (runlevel just reached 0).'.format(Readable.__name__)
)
return self.get(block, timeout)
return data

View File

@ -45,9 +45,11 @@ def inject(*iargs, **ikwargs):
def wrapper(target):
@functools.wraps(target)
def wrapped(*args, **kwargs):
return target(*itertools.chain(map(resolve, iargs), args),
**{ ** kwargs, ** {k: resolve(v)
for k, v in ikwargs.items()}})
return target(
*itertools.chain(map(resolve, iargs), args),
**{ ** kwargs, ** {k: resolve(v)
for k, v in ikwargs.items()}}
)
return wrapped

View File

@ -2,10 +2,10 @@ import time
from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from bonobo.core.strategies.base import Strategy
from bonobo.util.tokens import BEGIN, END
from ..bags import Bag
@ -48,3 +48,31 @@ class ThreadPoolExecutorStrategy(ExecutorStrategy):
class ProcessPoolExecutorStrategy(ExecutorStrategy):
executor_factory = ProcessPoolExecutor
class ThreadCollectionStrategy(Strategy):
def execute(self, graph, *args, plugins=None, **kwargs):
context = self.create_context(graph, plugins=plugins)
context.recv(BEGIN, Bag(), END)
threads = []
# for plugin_context in context.plugins:
# threads.append(executor.submit(plugin_context.run))
for component_context in context.components:
thread = Thread(target=component_context.run)
threads.append(thread)
thread.start()
# XXX TODO PLUGINS
while context.alive and len(threads):
time.sleep(0.1)
threads = list(filter(lambda thread: thread.is_alive, threads))
# for plugin_context in context.plugins:
# plugin_context.shutdown()
# executor.shutdown()
return context

View File

@ -53,9 +53,10 @@ class ConsoleOutputPlugin(Plugin):
def _write(self, context, rewind):
profile, debug = False, False
if profile:
append = (('Memory', '{0:.2f} Mb'.format(memory_usage())),
# ('Total time', '{0} s'.format(execution_time(harness))),
)
append = (
('Memory', '{0:.2f} Mb'.format(memory_usage())),
# ('Total time', '{0} s'.format(execution_time(harness))),
)
else:
append = ()
self.write(context, prefix=self.prefix, append=append, debug=debug, profile=profile, rewind=rewind)
@ -77,25 +78,35 @@ class ConsoleOutputPlugin(Plugin):
for i, component in enumerate(context):
if component.alive:
_line = ''.join((
t.black('({})'.format(i + 1)),
' ',
t.bold(t.white('+')),
' ',
component.name,
' ',
component.get_stats_as_string(
debug=debug, profile=profile),
' ', ))
_line = ''.join(
(
t.black('({})'.format(i + 1)),
' ',
t.bold(t.white('+')),
' ',
component.name,
' ',
component.get_stats_as_string(
debug=debug, profile=profile
),
' ',
)
)
else:
_line = t.black(''.join((
'({})'.format(i + 1),
' - ',
component.name,
' ',
component.get_stats_as_string(
debug=debug, profile=profile),
' ', )))
_line = t.black(
''.join(
(
'({})'.format(i + 1),
' - ',
component.name,
' ',
component.get_stats_as_string(
debug=debug, profile=profile
),
' ',
)
)
)
print(prefix + _line + t.clear_eol)
if append:

View File

@ -6,9 +6,11 @@ try:
except ImportError as e:
import logging
logging.exception('You must install Jupyter to use the bonobo Jupyter extension. Easiest way is to install the '
'optional "jupyter" dependencies with «pip install bonobo[jupyter]», but you can also install a '
'specific version by yourself.')
logging.exception(
'You must install Jupyter to use the bonobo Jupyter extension. Easiest way is to install the '
'optional "jupyter" dependencies with «pip install bonobo[jupyter]», but you can also install a '
'specific version by yourself.'
)
class JupyterOutputPlugin(Plugin):

View File

@ -3,17 +3,20 @@ from urllib.parse import urlencode
import requests # todo: make this a service so we can substitute it ?
def from_opendatasoft_api(dataset=None,
endpoint='{scheme}://{netloc}{path}',
scheme='https',
netloc='data.opendatasoft.com',
path='/api/records/1.0/search/',
rows=100,
**kwargs):
def from_opendatasoft_api(
dataset=None,
endpoint='{scheme}://{netloc}{path}',
scheme='https',
netloc='data.opendatasoft.com',
path='/api/records/1.0/search/',
rows=100,
**kwargs
):
path = path if path.startswith('/') else '/' + path
params = (
('dataset', dataset),
('rows', rows), ) + tuple(sorted(kwargs.items()))
('rows', rows),
) + tuple(sorted(kwargs.items()))
base_url = endpoint.format(scheme=scheme, netloc=netloc, path=path) + '?' + urlencode(params)
def _extract_ods():

View File

@ -59,7 +59,8 @@ class CsvReader(CsvHandler, FileReader):
if len(row) != field_count:
raise ValueError('Got a line with %d fields, expecting %d.' % (
len(row),
field_count, ))
field_count,
))
yield dict(zip(headers, row))

View File

@ -36,6 +36,7 @@ class FileHandler:
:param ctx:
:return:
"""
assert not hasattr(ctx, 'file'), 'A file pointer is already in the context... I do not know what to say...'
ctx.file = self.open()
@ -95,8 +96,8 @@ class FileWriter(Writer):
mode = 'w+'
def initialize(self, ctx):
super().initialize(ctx)
ctx.line = 0
return super().initialize(ctx)
def write(self, ctx, row):
"""
@ -114,4 +115,4 @@ class FileWriter(Writer):
def finalize(self, ctx):
del ctx.line
super().finalize(ctx)
return super().finalize(ctx)

View File

@ -1,47 +1,116 @@
""" Various simple utilities. """
import functools
import pprint
from pprint import pprint as _pprint
import blessings
from .tokens import NOT_MODIFIED
from .helpers import run, console_run, jupyter_run
from .tokens import NOT_MODIFIED
__all__ = [
'NOT_MODIFIED',
'console_run',
'head',
'jupyter_run',
'limit',
'log',
'noop',
'pprint',
'run',
'tee',
]
def head(n=10):
def identity(x):
return x
def limit(n=10):
i = 0
def _head(x):
def _limit(*args, **kwargs):
nonlocal i, n
i += 1
if i <= n:
yield x
yield NOT_MODIFIED
_head.__name__ = 'head({})'.format(n)
return _head
_limit.__name__ = 'limit({})'.format(n)
return _limit
def tee(f):
@functools.wraps(f)
def wrapped(x):
def wrapped(*args, **kwargs):
nonlocal f
f(x)
return x
f(*args, **kwargs)
return NOT_MODIFIED
return wrapped
log = tee(pprint.pprint)
log = tee(_pprint)
def pprint(title_keys=('title', 'name', 'id'), print_values=True, sort=True):
term = blessings.Terminal()
def _pprint(*args, **kwargs):
nonlocal title_keys, term, sort, print_values
row = args[0]
for key in title_keys:
if key in row:
print(term.bold(row.get(key)))
break
if print_values:
for k in sorted(row) if sort else row:
print(
'{t.blue}{k}{t.normal} : {t.black}({tp}){t.normal} {v}{t.clear_eol}'.format(
k=k, v=repr(row[k]), t=term, tp=type(row[k]).__name__
)
)
yield NOT_MODIFIED
_pprint.__name__ = 'pprint'
return _pprint
'''
def writehr(self, label=None):
width = t.width or 80
if label:
label = str(label)
sys.stderr.write(t.black('·' * 4) + shade('{') + label + shade('}') + t.black('·' * (width - (6+len(label)) - 1)) + '\n')
else:
sys.stderr.write(t.black('·' * (width-1) + '\n'))
def writeln(self, s):
"""Output method."""
sys.stderr.write(self.format(s) + '\n')
def initialize(self):
self.lineno = 0
def transform(self, hash, channel=STDIN):
"""Actual transformation."""
self.lineno += 1
if not self.condition or self.condition(hash):
hash = hash.copy()
hash = hash if not isinstance(self.field_filter, collections.Callable) else hash.restrict(self.field_filter)
if self.clean:
hash = hash.restrict(lambda k: len(k) and k[0] != '_')
self.writehr(self.lineno)
self.writeln(hash)
self.writehr()
sys.stderr.write('\n')
yield hash
'''
def noop(*args, **kwargs): # pylint: disable=unused-argument

View File

@ -1,20 +1,25 @@
def run(*chain, plugins=None):
def run(*chain, plugins=None, strategy=None):
from bonobo import Graph, ThreadPoolExecutorStrategy
graph = Graph()
graph.add_chain(*chain)
if len(chain) == 1 and isinstance(chain[0], Graph):
graph = chain[0]
elif len(chain) >= 1:
graph = Graph()
graph.add_chain(*chain)
else:
raise RuntimeError('Empty chain.')
executor = ThreadPoolExecutorStrategy()
executor = (strategy or ThreadPoolExecutorStrategy)()
return executor.execute(graph, plugins=plugins or [])
def console_run(*chain, output=True, plugins=None):
def console_run(*chain, output=True, plugins=None, strategy=None):
from bonobo.ext.console import ConsoleOutputPlugin
return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [])
return run(*chain, plugins=(plugins or []) + [ConsoleOutputPlugin()] if output else [], strategy=strategy)
def jupyter_run(*chain, plugins=None):
def jupyter_run(*chain, plugins=None, strategy=None):
from bonobo.ext.jupyter import JupyterOutputPlugin
return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()])
return run(*chain, plugins=(plugins or []) + [JupyterOutputPlugin()], strategy=strategy)

3
docs/_static/custom.css vendored Normal file
View File

@ -0,0 +1,3 @@
svg {
border: 2px solid green
}

2
docs/_static/graphs.css vendored Normal file
View File

@ -0,0 +1,2 @@
.node {
}

View File

@ -42,8 +42,7 @@ author = 'Romain Dorgueil'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
version = bonobo.__version__
release = bonobo.__version__
version = release = bonobo.__version__
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@ -154,8 +153,12 @@ man_pages = [(master_doc, 'bonobo', 'Bonobo Documentation', [author], 1)]
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [(master_doc, 'Bonobo', 'Bonobo Documentation', author, 'Bonobo',
'One line description of project.', 'Miscellaneous'), ]
texinfo_documents = [
(
master_doc, 'Bonobo', 'Bonobo Documentation', author, 'Bonobo', 'One line description of project.',
'Miscellaneous'
),
]
# -- Options for Epub output ----------------------------------------------

33
docs/guide/crawlers.rst Normal file
View File

@ -0,0 +1,33 @@
Web crawlers with Bonobo
========================
.. todo:: Bonobo-Selenium is at a very alpha stage, and things will change. This section is here to give a brief
overview but is neither complete nor definitive.
Writing web crawlers with Bonobo and Selenium is easy.
First, install **bonobo-selenium**:
.. code-block:: shell-session
$ pip install bonobo-selenium
The idea is to have one callable crawl one thing and delegate drill downs to callables further away in the chain.
An example chain could be:
.. graphviz::
digraph {
rankdir = LR;
login -> paginate -> list -> details -> "ExcelWriter(...)";
}
Where each step would do the following:
* `login()` is in charge to open an authenticated session in the browser.
* `paginate()` open each page of a fictive list and pass it to next.
* `list()` take every list item and yield it.
* `details()` extract the data you're interested in.
* ... and the writer saves it somewhere.

View File

@ -1,4 +1,8 @@
Guides
======
.. todo:: write the fucking doc!
.. toctree::
:maxdepth: 2
purity
crawlers

128
docs/guide/purity.rst Normal file
View File

@ -0,0 +1,128 @@
Pure components and space complexity
====================================
The nature of components, and how the data flow from one to another, make them not so easy to write correctly.
Hopefully, with a few hints, you will be able to understand why and how they should be written.
The major problem we have is that one message can go through more than one component, and at the same time. If you
wanna be safe, you tend to :func:`copy.copy()` everything between two calls to two different components, but that
will mean that a lot of useless memory space would be taken for copies that are never modified.
Instead of that, we chosed the oposite: copies are never made, and you should not modify in place the inputs of your
component before yielding them, and that mostly means that you want to recreate dicts and lists before yielding (or
returning) them. Numeric values, strings and tuples being immutable in python, modifying a variable of one of those
type will already return a different instance.
Numbers
=======
You can't be wrong with numbers. All of the following are correct.
.. code-block:: python
def do_your_number_thing(n: int) -> int:
return n
def do_your_number_thing(n: int) -> int:
yield n
def do_your_number_thing(n: int) -> int:
return n + 1
def do_your_number_thing(n: int) -> int:
yield n + 1
def do_your_number_thing(n: int) -> int:
# correct, but bad style
n += 1
return n
def do_your_number_thing(n: int) -> int:
# correct, but bad style
n += 1
yield n
The same is true with other numeric types, so don't be shy. Operate like crazy, my friend.
Tuples
======
Tuples are immutable, so you risk nothing.
.. code-block:: python
def do_your_tuple_thing(t: tuple) -> tuple:
return ('foo', ) + t
def do_your_tuple_thing(t: tuple) -> tuple:
return t + ('bar', )
def do_your_tuple_thing(t: tuple) -> tuple:
# correct, but bad style
t += ('baaaz', )
return t
Strings
=======
You know the drill, strings are immutable, blablabla ... Examples left as an exercise for the reader.
Dicts
=====
So, now it gets interesting. Dicts are mutable. It means that you can mess things up badly here if you're not cautious.
For example, doing the following may cause unexpected problems:
.. code-block:: python
def mutate_my_dict_like_crazy(d: dict) -> dict:
# Bad! Don't do that!
d.update({
'foo': compute_something()
})
# Still bad! Don't mutate the dict!
d['bar']: compute_anotherthing()
return d
The problem is easy to understand: as **Bonobo** won't make copies of your dict, the same dict will be passed along the
transformation graph, and mutations will be seen in components downwards the output, but also upward. Let's see
a more obvious example of something you should not do:
.. code-block:: python
def mutate_my_dict_and_yield() -> dict:
d = {}
for i in range(100):
# Bad! Don't do that!
d['index'] = i
yield d
Here, the same dict is yielded in each iteration, and its state when the next component in chain is called is undetermined.
Now let's see how to do it correctly:
.. code-block:: python
def new_dicts_like_crazy(d: dict) -> dict:
# Creating a new dict is correct.
return {
**d,
'foo': compute_something(),
'bar': compute_anotherthing(),
}
def new_dict_and_yield() -> dict:
d = {}
for i in range(100):
# Different dict each time.
yield {
'index': i
}
I hear you think «Yeah, but if I create like millions of dicts ...». The answer is simple. Using dicts like this will
create a lot, but also free a lot because as soon as all the future components that take this dict as input are done,
the dict will be garbage collected. Youplaboum!

View File

@ -1,22 +0,0 @@
bonobo.ext.console package
==========================
Submodules
----------
bonobo.ext.console.plugin module
--------------------------------
.. automodule:: bonobo.ext.console.plugin
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: bonobo.ext.console
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,30 +0,0 @@
bonobo.ext.jupyter package
==========================
Submodules
----------
bonobo.ext.jupyter.plugin module
--------------------------------
.. automodule:: bonobo.ext.jupyter.plugin
:members:
:undoc-members:
:show-inheritance:
bonobo.ext.jupyter.widget module
--------------------------------
.. automodule:: bonobo.ext.jupyter.widget
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: bonobo.ext.jupyter
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,46 +0,0 @@
bonobo.ext package
==================
Subpackages
-----------
.. toctree::
bonobo.ext.console
bonobo.ext.jupyter
Submodules
----------
bonobo.ext.couchdb_ module
--------------------------
.. automodule:: bonobo.ext.couchdb_
:members:
:undoc-members:
:show-inheritance:
bonobo.ext.opendatasoft module
------------------------------
.. automodule:: bonobo.ext.opendatasoft
:members:
:undoc-members:
:show-inheritance:
bonobo.ext.selenium module
--------------------------
.. automodule:: bonobo.ext.selenium
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: bonobo.ext
:members:
:undoc-members:
:show-inheritance:

View File

@ -58,7 +58,10 @@ Let's chain the three components together and run the transformation:
digraph {
rankdir = LR;
"generate_data" -> "uppercase" -> "output";
stylesheet = "../_static/graphs.css";
BEGIN [shape="point"];
BEGIN -> "generate_data" -> "uppercase" -> "output";
}
We use the :func:`bonobo.run` helper that hides the underlying object composition necessary to actually run the

View File

@ -2,7 +2,7 @@ import json
from blessings import Terminal
from bonobo import console_run, tee, JsonWriter
from bonobo import console_run, tee, JsonWriter, Graph
from bonobo.ext.opendatasoft import from_opendatasoft_api
try:
@ -40,10 +40,14 @@ def display(row):
print(t.bold(row.get('name')))
address = list(
filter(None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))),
row.get('county', None),
row.get('country'), )))
filter(
None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))),
row.get('county', None),
row.get('country'),
)
)
)
print(' - {}: {address}'.format(t.blue('address'), address=', '.join(address)))
print(' - {}: {links}'.format(t.blue('links'), links=', '.join(row['links'])))
@ -51,12 +55,15 @@ def display(row):
print(' - {}: {source}'.format(t.blue('source'), source='datanova/' + API_DATASET))
graph = Graph(
from_opendatasoft_api(
API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
),
normalize,
filter_france,
tee(display),
JsonWriter('fablabs.json'),
)
if __name__ == '__main__':
console_run(
from_opendatasoft_api(
API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'),
normalize,
filter_france,
tee(display),
JsonWriter('fablabs.json'),
output=True, )
console_run(graph, output=True)

View File

@ -8,8 +8,10 @@ OUTPUT_FILENAME = realpath(join(dirname(__file__), 'datasets/cheap_coffeeshops_i
console_run(
from_opendatasoft_api(
'liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'),
'liste-des-cafes-a-un-euro', netloc='opendata.paris.fr'
),
lambda row: '{nom_du_cafe}, {adresse}, {arrondissement} Paris, France'.format(**row),
FileWriter(OUTPUT_FILENAME), )
FileWriter(OUTPUT_FILENAME),
)
print('Import done, read {} for results.'.format(OUTPUT_FILENAME))

View File

@ -9,7 +9,8 @@ def yield_from(*args):
graph = Graph(
lambda: (x for x in ('foo', 'bar', 'baz')),
str.upper,
print, )
print,
)
# Use a thread pool.
executor = ThreadPoolExecutorStrategy()

View File

@ -1,8 +1,11 @@
# This file is autogenerated by edgy.project code generator.
# All changes will be overwritten.
import os
from setuptools import setup, find_packages
root_dir = os.path.dirname(os.path.abspath(__file__))
tolines = lambda c: list(filter(None, map(lambda s: s.strip(), c.split('\n'))))
@ -12,31 +15,56 @@ def read(filename, flt=None):
return flt(content) if callable(flt) else content
# Py3 compatibility hacks, borrowed from IPython.
try:
version = read('version.txt')
except: # pylint: disable=bare-except
version = 'dev'
execfile
except NameError:
def execfile(fname, globs, locs=None):
locs = locs or globs
exec(compile(open(fname).read(), fname, "exec"), globs, locs)
version_ns = {}
execfile(os.path.join(root_dir, 'bonobo/_version.py'), version_ns)
version = version_ns.get('__version__', 'dev')
setup(
name='bonobo',
description='Bonobo',
license='Apache License, Version 2.0',
install_requires=['blessings >=1.6,<1.7', 'psutil >=5.0,<5.1'],
install_requires=[
'blessings >=1.6,<1.7', 'psutil >=5.0,<5.1', 'requests >=2.12,<2.13',
'stevedore >=1.19,<1.20', 'toolz >=0.8,<0.9'
],
version=version,
long_description=read('README.rst'),
classifiers=read('classifiers.txt', tolines),
packages=find_packages(exclude=['ez_setup', 'example', 'test']),
include_package_data=True,
data_files=[('share/jupyter/nbextensions/bonobo-jupyter', [
'bonobo/ext/jupyter/static/extension.js', 'bonobo/ext/jupyter/static/index.js',
'bonobo/ext/jupyter/static/extension.js',
'bonobo/ext/jupyter/static/index.js',
'bonobo/ext/jupyter/static/index.js.map'
])],
extras_require={
'dev': [
'coverage >=4.2,<4.3', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4', 'pylint >=1.6,<1.7', 'pytest >=3,<4',
'pytest-cov >=2.4,<2.5', 'sphinx', 'sphinx_rtd_theme', 'yapf'
'coverage >=4.3,<4.4', 'mock >=2.0,<2.1', 'nose >=1.3,<1.4',
'pylint >=1.6,<1.7', 'pytest >=3,<4', 'pytest-cov >=2.4,<2.5',
'sphinx', 'sphinx_rtd_theme', 'yapf'
],
'jupyter': ['jupyter >=1.0,<1.1', 'ipywidgets >=6.0.0.beta5']
},
entry_points={
'bonobo.commands': [
'init = bonobo.commands.init:register',
'run = bonobo.commands.run:register'
],
'console_scripts': ['bonobo = bonobo.commands:entrypoint'],
'edgy.project.features':
['bonobo = '
'bonobo.ext.edgy.project.feature:BonoboFeature']
},
url='https://bonobo-project.org/',
download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.format(version=version), )
download_url='https://github.com/python-bonobo/bonobo/tarball/{version}'.
format(version=version), )

View File

@ -5,7 +5,8 @@ from bonobo.core.bags import INHERIT_INPUT
args = (
'foo',
'bar', )
'bar',
)
kwargs = dict(acme='corp')
@ -40,13 +41,15 @@ def test_inherit():
assert bag2.args == (
'a',
'b', )
'b',
)
assert bag2.kwargs == {'a': 1, 'b': 2}
assert INHERIT_INPUT in bag2.flags
assert bag3.args == (
'a',
'c', )
'c',
)
assert bag3.kwargs == {'a': 1, 'c': 3}
assert bag3.flags is ()
@ -57,7 +60,8 @@ def test_inherit():
bag4.set_parent(bag)
assert bag4.args == (
'a',
'd', )
'd',
)
assert bag4.kwargs == {'a': 1, 'd': 4}
assert bag4.flags is ()
@ -65,7 +69,8 @@ def test_inherit():
assert bag4.args == (
'a',
'c',
'd', )
'd',
)
assert bag4.kwargs == {'a': 1, 'c': 3, 'd': 4}
assert bag4.flags is ()

View File

@ -5,7 +5,8 @@ class MyThingWithStats(WithStatistics):
def get_stats(self, *args, **kwargs):
return (
('foo', 42),
('bar', 69), )
('bar', 69),
)
def test_with_statistics():

View File

@ -19,17 +19,18 @@ class ResponseMock:
def test_read_from_opendatasoft_api():
extract = from_opendatasoft_api('http://example.com/', 'test-a-set')
with patch(
'requests.get', return_value=ResponseMock([
{
'fields': {
'foo': 'bar'
}
},
{
'fields': {
'foo': 'zab'
}
},
])):
'requests.get', return_value=ResponseMock([
{
'fields': {
'foo': 'bar'
}
},
{
'fields': {
'foo': 'zab'
}
},
])
):
for line in extract():
assert 'foo' in line

View File

@ -11,7 +11,8 @@ from bonobo.util.tokens import BEGIN, END
[
(('ACME', ), 'ACME'), # one line...
(('Foo', 'Bar', 'Baz'), 'Foo\nBar\nBaz'), # more than one line...
])
]
)
def test_file_writer_in_context(tmpdir, lines, output):
file = tmpdir.join('output.txt')

View File

@ -1 +0,0 @@
0.1.2